This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9da3221a794 [HUDI-8781] Optimize executor memory usage during
executing clustering (#12515)
9da3221a794 is described below
commit 9da3221a79465f3326ae3ac206b08d60864ddcaa
Author: TheR1sing3un <[email protected]>
AuthorDate: Thu Dec 19 15:58:04 2024 +0800
[HUDI-8781] Optimize executor memory usage during executing clustering
(#12515)
* perf: optimize executor memory usage during executing clustering
1. optimize executor memory usage during executing clustering
Signed-off-by: TheR1sing3un <[email protected]>
* Cosmetic changes
---------
Signed-off-by: TheR1sing3un <[email protected]>
Co-authored-by: danny0405 <[email protected]>
---
.../hudi/client/utils/ConcatenatingIterator.java | 2 +-
.../client/utils/LazyConcatenatingIterator.java | 98 +++++++++++++++++
.../hudi/utils/TestLazyConcatenatingIterator.java | 114 ++++++++++++++++++++
.../MultipleSparkJobExecutionStrategy.java | 116 +++++++++++----------
4 files changed, 274 insertions(+), 56 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java
index aa6c29b0844..a380fa35adf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java
@@ -27,7 +27,7 @@ import java.util.Queue;
/**
* Provides iterator interface over List of iterators. Consumes all records
from first iterator element
- * before moving to next iterator in the list. That is concatenate elements
across multiple iterators.
+ * before moving to next iterator in the list. That is concatenating elements
across multiple iterators.
*
* @param <T>
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
new file mode 100644
index 00000000000..048c315f276
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.Supplier;
+
+/**
+ * Provides iterator interface over List of iterators. Consumes all records
from first iterator element
+ * before moving to next iterator in the list. That is concatenating elements
across multiple iterators.
+ *
+ * <p>Different with {@link ConcatenatingIterator}, the internal iterators are
instantiated lazily.
+ */
+public class LazyConcatenatingIterator<T> implements ClosableIterator<T> {
+
+ private final Queue<Supplier<ClosableIterator<T>>> iteratorSuppliers;
+
+ private ClosableIterator<T> itr;
+
+ private boolean initialed = false;
+
+ private boolean closed = false;
+
+ public LazyConcatenatingIterator(List<Supplier<ClosableIterator<T>>>
iteratorSuppliers) {
+ this.iteratorSuppliers = new LinkedList<>(iteratorSuppliers);
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ if (itr != null) {
+ itr.close();
+ itr = null;
+ }
+ iteratorSuppliers.clear();
+ closed = true;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ init();
+ while (itr != null) {
+ if (itr.hasNext()) {
+ return true;
+ }
+ // close current iterator
+ this.itr.close();
+ if (!iteratorSuppliers.isEmpty()) {
+ // move to the next
+ itr = iteratorSuppliers.poll().get();
+ } else {
+ itr = null;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public T next() {
+ ValidationUtils.checkState(hasNext(), "No more elements left");
+ return itr.next();
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void init() {
+ if (!initialed) {
+ if (!this.iteratorSuppliers.isEmpty()) {
+ this.itr = iteratorSuppliers.poll().get();
+ }
+ initialed = true;
+ }
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
new file mode 100644
index 00000000000..fa1a37d0277
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.hudi.client.utils.LazyConcatenatingIterator;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestLazyConcatenatingIterator {
+
+ int initTimes;
+ int closeTimes;
+
+ private class MockClosableIterator implements ClosableIterator {
+
+ Iterator<Integer> iterator;
+
+ public MockClosableIterator(Iterator<Integer> iterator) {
+ initTimes++;
+ this.iterator = iterator;
+ }
+
+ @Override
+ public void close() {
+ closeTimes++;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Object next() {
+ return iterator.next();
+ }
+ }
+
+ // Simple test for iterator concatenation
+ @Test
+ public void testConcatBasic() {
+ Supplier<ClosableIterator<Integer>> i1 = () -> new
MockClosableIterator(Arrays.asList(5, 3, 2, 1).iterator());
+ Supplier<ClosableIterator<Integer>> i2 = () -> new
MockClosableIterator(Collections.emptyIterator()); // empty iterator
+ Supplier<ClosableIterator<Integer>> i3 = () -> new
MockClosableIterator(Collections.singletonList(3).iterator());
+
+ LazyConcatenatingIterator<Integer> ci = new
LazyConcatenatingIterator<>(Arrays.asList(i1, i2, i3));
+
+ assertEquals(0, initTimes);
+
+ List<Integer> allElements = new ArrayList<>();
+ int count = 0;
+ while (ci.hasNext()) {
+ count++;
+ if (count == 1) {
+ assertEquals(1, initTimes);
+ assertEquals(0, closeTimes);
+ }
+ if (count == 5) {
+ assertEquals(3, initTimes);
+ assertEquals(2, closeTimes);
+ }
+ allElements.add(ci.next());
+ }
+
+ assertEquals(3, initTimes);
+ assertEquals(3, closeTimes);
+
+ assertEquals(5, allElements.size());
+ assertEquals(Arrays.asList(5, 3, 2, 1, 3), allElements);
+ }
+
+ @Test
+ public void testConcatError() {
+ Supplier<ClosableIterator<Integer>> i1 = () -> new
MockClosableIterator(Collections.emptyIterator()); // empty iterator
+
+ LazyConcatenatingIterator<Integer> ci = new
LazyConcatenatingIterator<>(Collections.singletonList(i1));
+ assertFalse(ci.hasNext());
+ try {
+ ci.next();
+ fail("expected error for empty iterator");
+ } catch (IllegalStateException e) {
+ //
+ }
+ }
+
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 0c28a9736fa..552041b8ecf 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -26,7 +26,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.SerializableSchema;
@@ -111,6 +111,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -336,44 +337,47 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
int readParallelism =
Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
return HoodieJavaRDD.of(jsc.parallelize(clusteringOps,
readParallelism).mapPartitions(clusteringOpsPartition -> {
- List<Iterator<HoodieRecord<T>>> recordIterators = new ArrayList<>();
+ List<Supplier<ClosableIterator<HoodieRecord<T>>>> suppliers = new
ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
- long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new
SparkTaskContextSupplier(), config);
- LOG.info("MaxMemoryPerCompaction run as part of clustering => " +
maxMemoryPerCompaction);
- try {
- Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()));
- HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(table.getStorage())
- .withBasePath(table.getMetaClient().getBasePath())
- .withLogFilePaths(clusteringOp.getDeltaFilePaths())
- .withReaderSchema(readerSchema)
- .withLatestInstantTime(instantTime)
- .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
- .withReverseReader(config.getCompactionReverseLogReadEnabled())
- .withBufferSize(config.getMaxDFSStreamBufferSize())
- .withSpillableMapBasePath(config.getSpillableMapBasePath())
- .withPartition(clusteringOp.getPartitionPath())
-
.withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
-
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
- .withRecordMerger(config.getRecordMerger())
- .withTableMetaClient(table.getMetaClient())
- .build();
-
- Option<HoodieFileReader> baseFileReader =
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
- ? Option.empty()
- : Option.of(getBaseOrBootstrapFileReader(storageConf,
bootstrapBasePath, partitionFields, clusteringOp));
- recordIterators.add(new HoodieFileSliceReader(baseFileReader,
scanner, readerSchema, tableConfig.getPreCombineField(),
config.getRecordMerger(),
- tableConfig.getProps(),
- tableConfig.populateMetaFields() ? Option.empty() :
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
- tableConfig.getPartitionFieldProp()))));
- } catch (IOException e) {
- throw new HoodieClusteringException("Error reading input data for "
+ clusteringOp.getDataFilePath()
- + " and " + clusteringOp.getDeltaFilePaths(), e);
- }
- });
- return new ConcatenatingIterator<>(recordIterators);
+ Supplier<ClosableIterator<HoodieRecord<T>>> iteratorSupplier = () -> {
+ long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new
SparkTaskContextSupplier(), config);
+ LOG.info("MaxMemoryPerCompaction run as part of clustering => " +
maxMemoryPerCompaction);
+ try {
+ Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()));
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withStorage(table.getStorage())
+ .withBasePath(table.getMetaClient().getBasePath())
+ .withLogFilePaths(clusteringOp.getDeltaFilePaths())
+ .withReaderSchema(readerSchema)
+ .withLatestInstantTime(instantTime)
+ .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+ .withReverseReader(config.getCompactionReverseLogReadEnabled())
+ .withBufferSize(config.getMaxDFSStreamBufferSize())
+ .withSpillableMapBasePath(config.getSpillableMapBasePath())
+ .withPartition(clusteringOp.getPartitionPath())
+
.withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
+
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
+
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
+ .withRecordMerger(config.getRecordMerger())
+ .withTableMetaClient(table.getMetaClient())
+ .build();
+
+ Option<HoodieFileReader> baseFileReader =
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+ ? Option.empty()
+ : Option.of(getBaseOrBootstrapFileReader(storageConf,
bootstrapBasePath, partitionFields, clusteringOp));
+ return new HoodieFileSliceReader(baseFileReader, scanner,
readerSchema, tableConfig.getPreCombineField(), config.getRecordMerger(),
+ tableConfig.getProps(),
+ tableConfig.populateMetaFields() ? Option.empty() :
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+ tableConfig.getPartitionFieldProp())));
+ } catch (IOException e) {
+ throw new HoodieClusteringException("Error reading input data for
" + clusteringOp.getDataFilePath()
+ + " and " + clusteringOp.getDeltaFilePaths(), e);
+ }
+ };
+ suppliers.add(iteratorSupplier);
+ });
+ return new LazyConcatenatingIterator<>(suppliers);
}));
}
@@ -395,27 +399,29 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism)
.mapPartitions(clusteringOpsPartition -> {
- List<Iterator<HoodieRecord<T>>> iteratorsForPartition = new
ArrayList<>();
+ List<Supplier<ClosableIterator<HoodieRecord<T>>>>
iteratorGettersForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
- try {
- Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(writeConfig.getSchema()));
- HoodieFileReader baseFileReader =
getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields,
clusteringOp);
-
- Option<BaseKeyGenerator> keyGeneratorOp =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
- // NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
- // payload pointing into a shared, mutable (underlying)
buffer we get a clean copy of
- // it since these records will be shuffled later.
- CloseableMappingIterator mappingIterator = new
CloseableMappingIterator(
- (ClosableIterator<HoodieRecord>)
baseFileReader.getRecordIterator(readerSchema),
- rec -> ((HoodieRecord)
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema,
writeConfig.getProps(), keyGeneratorOp));
- iteratorsForPartition.add(mappingIterator);
- } catch (IOException e) {
- throw new HoodieClusteringException("Error reading input data
for " + clusteringOp.getDataFilePath()
- + " and " + clusteringOp.getDeltaFilePaths(), e);
- }
+ Supplier<ClosableIterator<HoodieRecord<T>>> recordIteratorGetter =
() -> {
+ try {
+ Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(writeConfig.getSchema()));
+ HoodieFileReader baseFileReader =
getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields,
clusteringOp);
+
+ Option<BaseKeyGenerator> keyGeneratorOp =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
+ // NOTE: Record have to be cloned here to make sure if it
holds low-level engine-specific
+ // payload pointing into a shared, mutable (underlying)
buffer we get a clean copy of
+ // it since these records will be shuffled later.
+ return new CloseableMappingIterator(
+ (ClosableIterator<HoodieRecord>)
baseFileReader.getRecordIterator(readerSchema),
+ rec -> ((HoodieRecord)
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema,
writeConfig.getProps(), keyGeneratorOp));
+ } catch (IOException e) {
+ throw new HoodieClusteringException("Error reading input data
for " + clusteringOp.getDataFilePath()
+ + " and " + clusteringOp.getDeltaFilePaths(), e);
+ }
+ };
+ iteratorGettersForPartition.add(recordIteratorGetter);
});
- return new ConcatenatingIterator<>(iteratorsForPartition);
+ return new LazyConcatenatingIterator<>(iteratorGettersForPartition);
}));
}