This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 24f0db68904 [HUDI-8678] feat: improve consistent-bucket resizing
performance by reducing unnecessary record collecting (#12451)
24f0db68904 is described below
commit 24f0db68904b78ef10c7594b26660ddbcb0c00c7
Author: TheR1sing3un <[email protected]>
AuthorDate: Mon Jan 6 12:20:53 2025 +0800
[HUDI-8678] feat: improve consistent-bucket resizing performance by
reducing unnecessary record collecting (#12451)
* feat: improve consistent-bucket resizing performance by reducing
unnecessary record collecting
* refactor: remove ConsistentHashingBucketInsertPartitioner
---------
Signed-off-by: TheR1sing3un <[email protected]>
---
.../ConsistentHashingBucketInsertPartitioner.java | 33 -------------
...onsistentBucketClusteringExecutionStrategy.java | 23 ++++-----
...ntBucketIndexBulkInsertPartitionerWithRows.java | 43 +++++++++++------
.../RDDConsistentBucketBulkInsertPartitioner.java | 54 +++++++++++++++-------
4 files changed, 77 insertions(+), 76 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/ConsistentHashingBucketInsertPartitioner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/ConsistentHashingBucketInsertPartitioner.java
deleted file mode 100644
index b71d8768a2d..00000000000
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/ConsistentHashingBucketInsertPartitioner.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table;
-
-import org.apache.hudi.common.model.ConsistentHashingNode;
-
-import java.util.List;
-
-public interface ConsistentHashingBucketInsertPartitioner {
- /**
- * Set consistent hashing for partition, used in clustering
- *
- * @param partition partition to set Consistent Hashing nodes
- * @param nodes nodes from clustering plan
- */
- void addHashingChildrenNodes(String partition, List<ConsistentHashingNode>
nodes);
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
index e897c8717e2..6b637350f88 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
@@ -26,11 +26,11 @@ import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import
org.apache.hudi.execution.bulkinsert.ConsistentBucketIndexBulkInsertPartitionerWithRows;
import
org.apache.hudi.execution.bulkinsert.RDDConsistentBucketBulkInsertPartitioner;
-import org.apache.hudi.table.ConsistentHashingBucketInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
@@ -41,6 +41,7 @@ import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -72,10 +73,9 @@ public class
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
HoodieWriteConfig newConfig =
HoodieWriteConfig.newBuilder().withProps(props).build();
- ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner =
- new
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(),
strategyParams, shouldPreserveHoodieMetadata);
-
- addHashingChildNodes(partitioner, extraMetadata);
+ Pair<String, List<ConsistentHashingNode>> childNodesPair =
extractChildNodes(extraMetadata);
+ ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(),
strategyParams, shouldPreserveHoodieMetadata,
+ Collections.singletonMap(childNodesPair.getKey(),
childNodesPair.getValue()));
Dataset<Row> repartitionedRecords =
partitioner.repartitionRecords(inputRecords, numOutputGroups);
@@ -94,20 +94,21 @@ public class
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(),
Boolean.FALSE.toString());
HoodieWriteConfig newConfig =
HoodieWriteConfig.newBuilder().withProps(props).build();
- RDDConsistentBucketBulkInsertPartitioner<T> partitioner = new
RDDConsistentBucketBulkInsertPartitioner<>(getHoodieTable(), strategyParams,
preserveHoodieMetadata);
- addHashingChildNodes(partitioner, extraMetadata);
+ Pair<String, List<ConsistentHashingNode>> childNodesPair =
extractChildNodes(extraMetadata);
+ RDDConsistentBucketBulkInsertPartitioner<T> partitioner = new
RDDConsistentBucketBulkInsertPartitioner<>(getHoodieTable(), strategyParams,
preserveHoodieMetadata,
+ Collections.singletonMap(childNodesPair.getKey(),
childNodesPair.getValue()));
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance()
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, partitioner, true, numOutputGroups);
}
- private void addHashingChildNodes(ConsistentHashingBucketInsertPartitioner
partitioner, Map<String, String> extraMetadata) {
+ private Pair<String/*partition*/, List<ConsistentHashingNode>>
extractChildNodes(Map<String, String> extraMetadata) {
try {
List<ConsistentHashingNode> nodes =
ConsistentHashingNode.fromJsonString(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
-
partitioner.addHashingChildrenNodes(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
nodes);
+ return
Pair.of(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
nodes);
} catch (Exception e) {
- LOG.error("Failed to add hashing children nodes", e);
- throw new HoodieClusteringException("Failed to add hashing children
nodes", e);
+ LOG.error("Failed to extract hashing children nodes", e);
+ throw new HoodieClusteringException("Failed to extract hashing children
nodes", e);
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java
index b3a714eb5b1..24ef7fd1871 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java
@@ -31,7 +31,6 @@ import
org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.BucketSortBulkInsertPartitioner;
-import org.apache.hudi.table.ConsistentHashingBucketInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.Partitioner;
@@ -42,7 +41,6 @@ import org.apache.spark.sql.Row;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -54,14 +52,13 @@ import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_C
/**
* Bulk_insert partitioner of Spark row using consistent hashing bucket index.
*/
-public class ConsistentBucketIndexBulkInsertPartitionerWithRows
- extends BucketSortBulkInsertPartitioner<Dataset<Row>> implements
ConsistentHashingBucketInsertPartitioner {
+public class ConsistentBucketIndexBulkInsertPartitionerWithRows extends
BucketSortBulkInsertPartitioner<Dataset<Row>> {
private final String indexKeyFields;
private final List<String> fileIdPfxList = new ArrayList<>();
- private final Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;
+ private Map<String/*partition*/, List<ConsistentHashingNode>/*pending
resizing related child nodes*/> hashingChildrenNodes;
private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
@@ -73,12 +70,20 @@ public class
ConsistentBucketIndexBulkInsertPartitionerWithRows
private final boolean populateMetaFields;
+ public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table,
+ Map<String,
String> strategyParams, boolean populateMetaFields) {
+ this(table, strategyParams, populateMetaFields, null);
+ }
+
+ /**
+ * Constructor of ConsistentBucketIndexBulkInsertPartitionerWithRows.
+ * @param hashingChildrenNodes children nodes for clustering, only used in
executing clustering
+ */
public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table,
Map<String,
String> strategyParams,
- boolean
populateMetaFields) {
+ boolean
populateMetaFields, Map<String, List<ConsistentHashingNode>>
hashingChildrenNodes) {
super(table, strategyParams.getOrDefault(PLAN_STRATEGY_SORT_COLUMNS.key(),
""));
this.indexKeyFields = table.getConfig().getBucketIndexHashField();
- this.hashingChildrenNodes = new HashMap<>();
this.populateMetaFields = populateMetaFields;
if (!populateMetaFields) {
this.keyGeneratorOpt =
HoodieSparkKeyGeneratorFactory.getKeyGenerator(table.getConfig().getProps());
@@ -88,13 +93,25 @@ public class
ConsistentBucketIndexBulkInsertPartitionerWithRows
this.extractor =
RowRecordKeyExtractor.getRowRecordKeyExtractor(populateMetaFields,
keyGeneratorOpt);
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
"Consistent hash bucket index doesn't support CoW table");
+ if (hashingChildrenNodes != null) {
+ /**
+ * Set pending consistent hashing for partition.
+ * The bulk insert will directly use the pending metadata as the
consistent hash metadata for writing data to after-resizing buckets.
+ * NOTE: Only used in the case of executing bulk insert.
+ */
+
ValidationUtils.checkArgument(hashingChildrenNodes.values().stream().flatMap(List::stream).noneMatch(n
-> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL),
+ "children nodes should not be tagged as NORMAL");
+ this.hashingChildrenNodes = hashingChildrenNodes;
+ }
}
private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
HoodieSparkConsistentBucketIndex index =
(HoodieSparkConsistentBucketIndex) table.getIndex();
HoodieConsistentHashingMetadata metadata =
ConsistentBucketIndexUtils.loadOrCreateMetadata(this.table, partition,
index.getNumBuckets());
- if (hashingChildrenNodes.containsKey(partition)) {
+ if (hashingChildrenNodes != null) {
+ // for executing bucket resizing
+ ValidationUtils.checkState(hashingChildrenNodes.containsKey(partition),
"children nodes should be provided for clustering");
metadata.setChildrenNodes(hashingChildrenNodes.get(partition));
}
return new ConsistentBucketIdentifier(metadata);
@@ -152,17 +169,13 @@ public class
ConsistentBucketIndexBulkInsertPartitionerWithRows
* the mapping from partition to its bucket identifier is constructed.
*/
private Map<String, ConsistentBucketIdentifier>
initializeBucketIdentifier(JavaRDD<Row> rows) {
+ if (hashingChildrenNodes != null) {
+ return hashingChildrenNodes.keySet().stream().collect(Collectors.toMap(p
-> p, this::getBucketIdentifier));
+ }
return
rows.map(this.extractor::getPartitionPath).distinct().collect().stream()
.collect(Collectors.toMap(p -> p, this::getBucketIdentifier));
}
- @Override
- public void addHashingChildrenNodes(String partition,
List<ConsistentHashingNode> nodes) {
- ValidationUtils.checkState(nodes.stream().noneMatch(n -> n.getTag() ==
ConsistentHashingNode.NodeTag.NORMAL),
- "children nodes should not be tagged as NORMAL");
- hashingChildrenNodes.put(partition, nodes);
- }
-
private int getBucketId(Row row) {
String recordKey = extractor.getRecordKey(row);
String partitionPath = extractor.getPartitionPath(row);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
index 32b23751973..51c39105ce5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
@@ -32,7 +32,6 @@ import
org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.WriteHandleFactory;
-import org.apache.hudi.table.ConsistentHashingBucketInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.Partitioner;
@@ -49,25 +48,44 @@ import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_C
/**
* A partitioner for (consistent hashing) bucket index used in bulk_insert
*/
-public class RDDConsistentBucketBulkInsertPartitioner<T> extends
RDDBucketIndexPartitioner<T> implements
ConsistentHashingBucketInsertPartitioner {
+public class RDDConsistentBucketBulkInsertPartitioner<T> extends
RDDBucketIndexPartitioner<T> {
- private final Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;
+ private Map<String/*partition*/, List<ConsistentHashingNode/*pending
resizing related child nodes*/>> hashingChildrenNodes;
+
+ public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table) {
+ this(table, Collections.emptyMap(), false);
+ ValidationUtils.checkArgument(table.getIndex() instanceof
HoodieSparkConsistentBucketIndex,
+ "RDDConsistentBucketPartitioner can only be used together with
consistent hashing bucket index");
+ }
public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table,
Map<String, String>
strategyParams,
boolean
preserveHoodieMetadata) {
+ this(table, strategyParams, preserveHoodieMetadata, null);
+ }
+
+ /**
+ * Constructor of RDDConsistentBucketBulkInsertPartitioner.
+ * @param hashingChildrenNodes children nodes for clustering, only used in
executing clustering
+ */
+ public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table,
+ Map<String, String>
strategyParams,
+ boolean
preserveHoodieMetadata, Map<String, List<ConsistentHashingNode>>
hashingChildrenNodes) {
super(table,
strategyParams.getOrDefault(PLAN_STRATEGY_SORT_COLUMNS.key(), null),
preserveHoodieMetadata);
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
"Consistent hash bucket index doesn't support CoW table");
- this.hashingChildrenNodes = new HashMap<>();
- }
-
- public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table) {
- this(table, Collections.emptyMap(), false);
- ValidationUtils.checkArgument(table.getIndex() instanceof
HoodieSparkConsistentBucketIndex,
- "RDDConsistentBucketPartitioner can only be used together with
consistent hashing bucket index");
+ if (hashingChildrenNodes != null) {
+ /**
+ * Set pending consistent hashing for partition.
+ * The bulk insert will directly use the pending metadata as the
consistent hash metadata for writing data to after-resizing buckets.
+ * NOTE: Only used in the case of executing bulk insert.
+ */
+
ValidationUtils.checkArgument(hashingChildrenNodes.values().stream().flatMap(List::stream).noneMatch(n
-> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL),
+ "children nodes should not be tagged as NORMAL");
+ this.hashingChildrenNodes = hashingChildrenNodes;
+ }
}
/**
@@ -99,20 +117,19 @@ public class RDDConsistentBucketBulkInsertPartitioner<T>
extends RDDBucketIndexP
});
}
- @Override
- public void addHashingChildrenNodes(String partition,
List<ConsistentHashingNode> nodes) {
- ValidationUtils.checkState(nodes.stream().noneMatch(n -> n.getTag() ==
ConsistentHashingNode.NodeTag.NORMAL), "children nodes should not be tagged as
NORMAL");
- hashingChildrenNodes.put(partition, nodes);
- }
-
/**
* Get (construct) the bucket identifier of the given partition
*/
private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
HoodieSparkConsistentBucketIndex index =
(HoodieSparkConsistentBucketIndex) table.getIndex();
HoodieConsistentHashingMetadata metadata =
ConsistentBucketIndexUtils.loadOrCreateMetadata(this.table, partition,
index.getNumBuckets());
- if (hashingChildrenNodes.containsKey(partition)) {
+ if (hashingChildrenNodes != null) {
+ // for executing bucket resizing
+ ValidationUtils.checkState(hashingChildrenNodes.containsKey(partition),
"children nodes should be provided for clustering");
metadata.setChildrenNodes(hashingChildrenNodes.get(partition));
+ } else {
+ // for normal bulk insert
+ ValidationUtils.checkState(hashingChildrenNodes == null, "children nodes
should not be provided for normal bulk insert");
}
return new ConsistentBucketIdentifier(metadata);
}
@@ -122,6 +139,9 @@ public class RDDConsistentBucketBulkInsertPartitioner<T>
extends RDDBucketIndexP
* the mapping from partition to its bucket identifier is constructed.
*/
private Map<String, ConsistentBucketIdentifier>
initializeBucketIdentifier(JavaRDD<HoodieRecord<T>> records) {
+ if (hashingChildrenNodes != null) {
+ return hashingChildrenNodes.keySet().stream().collect(Collectors.toMap(p
-> p, this::getBucketIdentifier));
+ }
return
records.map(HoodieRecord::getPartitionPath).distinct().collect().stream()
.collect(Collectors.toMap(p -> p, this::getBucketIdentifier));
}