This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9da3221a794 [HUDI-8781] Optimize executor memory usage during 
executing clustering (#12515)
9da3221a794 is described below

commit 9da3221a79465f3326ae3ac206b08d60864ddcaa
Author: TheR1sing3un <[email protected]>
AuthorDate: Thu Dec 19 15:58:04 2024 +0800

    [HUDI-8781] Optimize executor memory usage during executing clustering 
(#12515)
    
    * perf: optimize executor memory usage during executing clustering
    
    1. optimize executor memory usage during executing clustering
    
    Signed-off-by: TheR1sing3un <[email protected]>
    
    * Cosmetic changes
    
    ---------
    
    Signed-off-by: TheR1sing3un <[email protected]>
    Co-authored-by: danny0405 <[email protected]>
---
 .../hudi/client/utils/ConcatenatingIterator.java   |   2 +-
 .../client/utils/LazyConcatenatingIterator.java    |  98 +++++++++++++++++
 .../hudi/utils/TestLazyConcatenatingIterator.java  | 114 ++++++++++++++++++++
 .../MultipleSparkJobExecutionStrategy.java         | 116 +++++++++++----------
 4 files changed, 274 insertions(+), 56 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java
index aa6c29b0844..a380fa35adf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java
@@ -27,7 +27,7 @@ import java.util.Queue;
 
 /**
  * Provides iterator interface over List of iterators. Consumes all records 
from first iterator element
- * before moving to next iterator in the list. That is concatenate elements 
across multiple iterators.
+ * before moving to next iterator in the list. That is concatenating elements 
across multiple iterators.
  *
  * @param <T>
  */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
new file mode 100644
index 00000000000..048c315f276
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.Supplier;
+
+/**
+ * Provides iterator interface over List of iterators. Consumes all records 
from first iterator element
+ * before moving to next iterator in the list. That is concatenating elements 
across multiple iterators.
+ *
+ * <p>Different with {@link ConcatenatingIterator}, the internal iterators are 
instantiated lazily.
+ */
+public class LazyConcatenatingIterator<T> implements ClosableIterator<T> {
+
+  private final Queue<Supplier<ClosableIterator<T>>> iteratorSuppliers;
+
+  private ClosableIterator<T> itr;
+
+  private boolean initialed = false;
+
+  private boolean closed = false;
+
+  public LazyConcatenatingIterator(List<Supplier<ClosableIterator<T>>> 
iteratorSuppliers) {
+    this.iteratorSuppliers = new LinkedList<>(iteratorSuppliers);
+  }
+
+  @Override
+  public void close() {
+    if (!closed) {
+      if (itr != null) {
+        itr.close();
+        itr = null;
+      }
+      iteratorSuppliers.clear();
+      closed = true;
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    init();
+    while (itr != null) {
+      if (itr.hasNext()) {
+        return true;
+      }
+      // close current iterator
+      this.itr.close();
+      if (!iteratorSuppliers.isEmpty()) {
+        // move to the next
+        itr = iteratorSuppliers.poll().get();
+      } else {
+        itr = null;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public T next() {
+    ValidationUtils.checkState(hasNext(), "No more elements left");
+    return itr.next();
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void init() {
+    if (!initialed) {
+      if (!this.iteratorSuppliers.isEmpty()) {
+        this.itr = iteratorSuppliers.poll().get();
+      }
+      initialed = true;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
new file mode 100644
index 00000000000..fa1a37d0277
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.hudi.client.utils.LazyConcatenatingIterator;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestLazyConcatenatingIterator {
+
+  int initTimes;
+  int closeTimes;
+
+  private class MockClosableIterator implements ClosableIterator {
+
+    Iterator<Integer> iterator;
+
+    public MockClosableIterator(Iterator<Integer> iterator) {
+      initTimes++;
+      this.iterator = iterator;
+    }
+
+    @Override
+    public void close() {
+      closeTimes++;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public Object next() {
+      return iterator.next();
+    }
+  }
+
+  // Simple test for iterator concatenation
+  @Test
+  public void testConcatBasic() {
+    Supplier<ClosableIterator<Integer>> i1 = () -> new 
MockClosableIterator(Arrays.asList(5, 3, 2, 1).iterator());
+    Supplier<ClosableIterator<Integer>> i2 = () -> new 
MockClosableIterator(Collections.emptyIterator()); // empty iterator
+    Supplier<ClosableIterator<Integer>> i3 = () -> new 
MockClosableIterator(Collections.singletonList(3).iterator());
+
+    LazyConcatenatingIterator<Integer> ci = new 
LazyConcatenatingIterator<>(Arrays.asList(i1, i2, i3));
+
+    assertEquals(0, initTimes);
+
+    List<Integer> allElements = new ArrayList<>();
+    int count = 0;
+    while (ci.hasNext()) {
+      count++;
+      if (count == 1) {
+        assertEquals(1, initTimes);
+        assertEquals(0, closeTimes);
+      }
+      if (count == 5) {
+        assertEquals(3, initTimes);
+        assertEquals(2, closeTimes);
+      }
+      allElements.add(ci.next());
+    }
+
+    assertEquals(3, initTimes);
+    assertEquals(3, closeTimes);
+
+    assertEquals(5, allElements.size());
+    assertEquals(Arrays.asList(5, 3, 2, 1, 3), allElements);
+  }
+
+  @Test
+  public void testConcatError() {
+    Supplier<ClosableIterator<Integer>> i1 = () -> new 
MockClosableIterator(Collections.emptyIterator()); // empty iterator
+
+    LazyConcatenatingIterator<Integer> ci = new 
LazyConcatenatingIterator<>(Collections.singletonList(i1));
+    assertFalse(ci.hasNext());
+    try {
+      ci.next();
+      fail("expected error for empty iterator");
+    } catch (IllegalStateException e) {
+      //
+    }
+  }
+
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 0c28a9736fa..552041b8ecf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -26,7 +26,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.client.utils.LazyConcatenatingIterator;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.SerializableSchema;
@@ -111,6 +111,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -336,44 +337,47 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
     int readParallelism = 
Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
 
     return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, 
readParallelism).mapPartitions(clusteringOpsPartition -> {
-      List<Iterator<HoodieRecord<T>>> recordIterators = new ArrayList<>();
+      List<Supplier<ClosableIterator<HoodieRecord<T>>>> suppliers = new 
ArrayList<>();
       clusteringOpsPartition.forEachRemaining(clusteringOp -> {
-        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new 
SparkTaskContextSupplier(), config);
-        LOG.info("MaxMemoryPerCompaction run as part of clustering => " + 
maxMemoryPerCompaction);
-        try {
-          Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
-          HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
-              .withStorage(table.getStorage())
-              .withBasePath(table.getMetaClient().getBasePath())
-              .withLogFilePaths(clusteringOp.getDeltaFilePaths())
-              .withReaderSchema(readerSchema)
-              .withLatestInstantTime(instantTime)
-              .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
-              .withReverseReader(config.getCompactionReverseLogReadEnabled())
-              .withBufferSize(config.getMaxDFSStreamBufferSize())
-              .withSpillableMapBasePath(config.getSpillableMapBasePath())
-              .withPartition(clusteringOp.getPartitionPath())
-              
.withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
-              
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-              
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
-              .withRecordMerger(config.getRecordMerger())
-              .withTableMetaClient(table.getMetaClient())
-              .build();
-
-          Option<HoodieFileReader> baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
-              ? Option.empty()
-              : Option.of(getBaseOrBootstrapFileReader(storageConf, 
bootstrapBasePath, partitionFields, clusteringOp));
-          recordIterators.add(new HoodieFileSliceReader(baseFileReader, 
scanner, readerSchema, tableConfig.getPreCombineField(), 
config.getRecordMerger(),
-              tableConfig.getProps(),
-              tableConfig.populateMetaFields() ? Option.empty() : 
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
-                  tableConfig.getPartitionFieldProp()))));
-        } catch (IOException e) {
-          throw new HoodieClusteringException("Error reading input data for " 
+ clusteringOp.getDataFilePath()
-              + " and " + clusteringOp.getDeltaFilePaths(), e);
-        }
-      });
 
-      return new ConcatenatingIterator<>(recordIterators);
+        Supplier<ClosableIterator<HoodieRecord<T>>> iteratorSupplier = () -> {
+          long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new 
SparkTaskContextSupplier(), config);
+          LOG.info("MaxMemoryPerCompaction run as part of clustering => " + 
maxMemoryPerCompaction);
+          try {
+            Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
+            HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+                .withStorage(table.getStorage())
+                .withBasePath(table.getMetaClient().getBasePath())
+                .withLogFilePaths(clusteringOp.getDeltaFilePaths())
+                .withReaderSchema(readerSchema)
+                .withLatestInstantTime(instantTime)
+                .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+                .withReverseReader(config.getCompactionReverseLogReadEnabled())
+                .withBufferSize(config.getMaxDFSStreamBufferSize())
+                .withSpillableMapBasePath(config.getSpillableMapBasePath())
+                .withPartition(clusteringOp.getPartitionPath())
+                
.withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
+                
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
+                
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
+                .withRecordMerger(config.getRecordMerger())
+                .withTableMetaClient(table.getMetaClient())
+                .build();
+
+            Option<HoodieFileReader> baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+                ? Option.empty()
+                : Option.of(getBaseOrBootstrapFileReader(storageConf, 
bootstrapBasePath, partitionFields, clusteringOp));
+            return new HoodieFileSliceReader(baseFileReader, scanner, 
readerSchema, tableConfig.getPreCombineField(), config.getRecordMerger(),
+                tableConfig.getProps(),
+                tableConfig.populateMetaFields() ? Option.empty() : 
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+                    tableConfig.getPartitionFieldProp())));
+          } catch (IOException e) {
+            throw new HoodieClusteringException("Error reading input data for 
" + clusteringOp.getDataFilePath()
+                + " and " + clusteringOp.getDeltaFilePaths(), e);
+          }
+        };
+        suppliers.add(iteratorSupplier);
+      });
+      return new LazyConcatenatingIterator<>(suppliers);
     }));
   }
 
@@ -395,27 +399,29 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
 
     return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism)
         .mapPartitions(clusteringOpsPartition -> {
-          List<Iterator<HoodieRecord<T>>> iteratorsForPartition = new 
ArrayList<>();
+          List<Supplier<ClosableIterator<HoodieRecord<T>>>> 
iteratorGettersForPartition = new ArrayList<>();
           clusteringOpsPartition.forEachRemaining(clusteringOp -> {
-            try {
-              Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getSchema()));
-              HoodieFileReader baseFileReader = 
getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, 
clusteringOp);
-
-              Option<BaseKeyGenerator> keyGeneratorOp = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
-              // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
-              //       payload pointing into a shared, mutable (underlying) 
buffer we get a clean copy of
-              //       it since these records will be shuffled later.
-              CloseableMappingIterator mappingIterator = new 
CloseableMappingIterator(
-                  (ClosableIterator<HoodieRecord>) 
baseFileReader.getRecordIterator(readerSchema),
-                  rec -> ((HoodieRecord) 
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, 
writeConfig.getProps(), keyGeneratorOp));
-              iteratorsForPartition.add(mappingIterator);
-            } catch (IOException e) {
-              throw new HoodieClusteringException("Error reading input data 
for " + clusteringOp.getDataFilePath()
-                  + " and " + clusteringOp.getDeltaFilePaths(), e);
-            }
+            Supplier<ClosableIterator<HoodieRecord<T>>> recordIteratorGetter = 
() -> {
+              try {
+                Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getSchema()));
+                HoodieFileReader baseFileReader = 
getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, 
clusteringOp);
+
+                Option<BaseKeyGenerator> keyGeneratorOp = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
+                // NOTE: Record have to be cloned here to make sure if it 
holds low-level engine-specific
+                //       payload pointing into a shared, mutable (underlying) 
buffer we get a clean copy of
+                //       it since these records will be shuffled later.
+                return new CloseableMappingIterator(
+                    (ClosableIterator<HoodieRecord>) 
baseFileReader.getRecordIterator(readerSchema),
+                    rec -> ((HoodieRecord) 
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, 
writeConfig.getProps(), keyGeneratorOp));
+              } catch (IOException e) {
+                throw new HoodieClusteringException("Error reading input data 
for " + clusteringOp.getDataFilePath()
+                    + " and " + clusteringOp.getDeltaFilePaths(), e);
+              }
+            };
+            iteratorGettersForPartition.add(recordIteratorGetter);
           });
 
-          return new ConcatenatingIterator<>(iteratorsForPartition);
+          return new LazyConcatenatingIterator<>(iteratorGettersForPartition);
         }));
   }
 

Reply via email to