This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 24f0db68904 [HUDI-8678] feat: improve consistent-bucket resizing 
performance by reducing unnecessary record collecting (#12451)
24f0db68904 is described below

commit 24f0db68904b78ef10c7594b26660ddbcb0c00c7
Author: TheR1sing3un <[email protected]>
AuthorDate: Mon Jan 6 12:20:53 2025 +0800

    [HUDI-8678] feat: improve consistent-bucket resizing performance by 
reducing unnecessary record collecting (#12451)
    
    * feat: improve consistent-bucket resizing performance by reducing 
unnecessary record collecting
    * refactor: remove ConsistentHashingBucketInsertPartitioner
    
    ---------
    
    Signed-off-by: TheR1sing3un <[email protected]>
---
 .../ConsistentHashingBucketInsertPartitioner.java  | 33 -------------
 ...onsistentBucketClusteringExecutionStrategy.java | 23 ++++-----
 ...ntBucketIndexBulkInsertPartitionerWithRows.java | 43 +++++++++++------
 .../RDDConsistentBucketBulkInsertPartitioner.java  | 54 +++++++++++++++-------
 4 files changed, 77 insertions(+), 76 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/ConsistentHashingBucketInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/ConsistentHashingBucketInsertPartitioner.java
deleted file mode 100644
index b71d8768a2d..00000000000
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/ConsistentHashingBucketInsertPartitioner.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table;
-
-import org.apache.hudi.common.model.ConsistentHashingNode;
-
-import java.util.List;
-
-public interface ConsistentHashingBucketInsertPartitioner {
-  /**
-   * Set consistent hashing for partition, used in clustering
-   *
-   * @param partition partition to set Consistent Hashing nodes
-   * @param nodes     nodes from clustering plan
-   */
-  void addHashingChildrenNodes(String partition, List<ConsistentHashingNode> 
nodes);
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
index e897c8717e2..6b637350f88 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
@@ -26,11 +26,11 @@ import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
 import 
org.apache.hudi.execution.bulkinsert.ConsistentBucketIndexBulkInsertPartitionerWithRows;
 import 
org.apache.hudi.execution.bulkinsert.RDDConsistentBucketBulkInsertPartitioner;
-import org.apache.hudi.table.ConsistentHashingBucketInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
@@ -41,6 +41,7 @@ import org.apache.spark.sql.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -72,10 +73,9 @@ public class 
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
 
     HoodieWriteConfig newConfig = 
HoodieWriteConfig.newBuilder().withProps(props).build();
 
-    ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner =
-        new 
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(), 
strategyParams, shouldPreserveHoodieMetadata);
-
-    addHashingChildNodes(partitioner, extraMetadata);
+    Pair<String, List<ConsistentHashingNode>> childNodesPair = 
extractChildNodes(extraMetadata);
+    ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new 
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(), 
strategyParams, shouldPreserveHoodieMetadata,
+        Collections.singletonMap(childNodesPair.getKey(), 
childNodesPair.getValue()));
 
     Dataset<Row> repartitionedRecords = 
partitioner.repartitionRecords(inputRecords, numOutputGroups);
 
@@ -94,20 +94,21 @@ public class 
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
     props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), 
Boolean.FALSE.toString());
     HoodieWriteConfig newConfig = 
HoodieWriteConfig.newBuilder().withProps(props).build();
 
-    RDDConsistentBucketBulkInsertPartitioner<T> partitioner = new 
RDDConsistentBucketBulkInsertPartitioner<>(getHoodieTable(), strategyParams, 
preserveHoodieMetadata);
-    addHashingChildNodes(partitioner, extraMetadata);
+    Pair<String, List<ConsistentHashingNode>> childNodesPair = 
extractChildNodes(extraMetadata);
+    RDDConsistentBucketBulkInsertPartitioner<T> partitioner = new 
RDDConsistentBucketBulkInsertPartitioner<>(getHoodieTable(), strategyParams, 
preserveHoodieMetadata,
+        Collections.singletonMap(childNodesPair.getKey(), 
childNodesPair.getValue()));
 
     return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance()
         .bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, 
false, partitioner, true, numOutputGroups);
   }
 
-  private void addHashingChildNodes(ConsistentHashingBucketInsertPartitioner 
partitioner, Map<String, String> extraMetadata) {
+  private Pair<String/*partition*/, List<ConsistentHashingNode>> 
extractChildNodes(Map<String, String> extraMetadata) {
     try {
       List<ConsistentHashingNode> nodes = 
ConsistentHashingNode.fromJsonString(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
-      
partitioner.addHashingChildrenNodes(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
 nodes);
+      return 
Pair.of(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
 nodes);
     } catch (Exception e) {
-      LOG.error("Failed to add hashing children nodes", e);
-      throw new HoodieClusteringException("Failed to add hashing children 
nodes", e);
+      LOG.error("Failed to extract hashing children nodes", e);
+      throw new HoodieClusteringException("Failed to extract hashing children 
nodes", e);
     }
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java
index b3a714eb5b1..24ef7fd1871 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java
@@ -31,7 +31,6 @@ import 
org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.table.BucketSortBulkInsertPartitioner;
-import org.apache.hudi.table.ConsistentHashingBucketInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.Partitioner;
@@ -42,7 +41,6 @@ import org.apache.spark.sql.Row;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -54,14 +52,13 @@ import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_C
 /**
  * Bulk_insert partitioner of Spark row using consistent hashing bucket index.
  */
-public class ConsistentBucketIndexBulkInsertPartitionerWithRows
-    extends BucketSortBulkInsertPartitioner<Dataset<Row>> implements 
ConsistentHashingBucketInsertPartitioner {
+public class ConsistentBucketIndexBulkInsertPartitionerWithRows extends 
BucketSortBulkInsertPartitioner<Dataset<Row>> {
 
   private final String indexKeyFields;
 
   private final List<String> fileIdPfxList = new ArrayList<>();
 
-  private final Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;
+  private Map<String/*partition*/, List<ConsistentHashingNode>/*pending 
resizing related child nodes*/> hashingChildrenNodes;
 
   private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
 
@@ -73,12 +70,20 @@ public class 
ConsistentBucketIndexBulkInsertPartitionerWithRows
 
   private final boolean populateMetaFields;
 
+  public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table,
+                                                            Map<String, 
String> strategyParams, boolean populateMetaFields) {
+    this(table, strategyParams, populateMetaFields, null);
+  }
+
+  /**
+   * Constructor of ConsistentBucketIndexBulkInsertPartitionerWithRows.
+   * @param hashingChildrenNodes children nodes for clustering, only used in 
executing clustering
+   */
   public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table,
                                                             Map<String, 
String> strategyParams,
-                                                            boolean 
populateMetaFields) {
+                                                            boolean 
populateMetaFields, Map<String, List<ConsistentHashingNode>> 
hashingChildrenNodes) {
     super(table, strategyParams.getOrDefault(PLAN_STRATEGY_SORT_COLUMNS.key(), 
""));
     this.indexKeyFields = table.getConfig().getBucketIndexHashField();
-    this.hashingChildrenNodes = new HashMap<>();
     this.populateMetaFields = populateMetaFields;
     if (!populateMetaFields) {
       this.keyGeneratorOpt = 
HoodieSparkKeyGeneratorFactory.getKeyGenerator(table.getConfig().getProps());
@@ -88,13 +93,25 @@ public class 
ConsistentBucketIndexBulkInsertPartitionerWithRows
     this.extractor = 
RowRecordKeyExtractor.getRowRecordKeyExtractor(populateMetaFields, 
keyGeneratorOpt);
     
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
         "Consistent hash bucket index doesn't support CoW table");
+    if (hashingChildrenNodes != null) {
+      /**
+       * Set pending consistent hashing for partition.
+       * The bulk insert will directly use the pending metadata as the 
consistent hash metadata for writing data to after-resizing buckets.
+       * NOTE: Only used in the case of executing bulk insert.
+       */
+      
ValidationUtils.checkArgument(hashingChildrenNodes.values().stream().flatMap(List::stream).noneMatch(n
 -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL),
+          "children nodes should not be tagged as NORMAL");
+      this.hashingChildrenNodes = hashingChildrenNodes;
+    }
   }
 
   private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
     HoodieSparkConsistentBucketIndex index = 
(HoodieSparkConsistentBucketIndex) table.getIndex();
     HoodieConsistentHashingMetadata metadata =
         ConsistentBucketIndexUtils.loadOrCreateMetadata(this.table, partition, 
index.getNumBuckets());
-    if (hashingChildrenNodes.containsKey(partition)) {
+    if (hashingChildrenNodes != null) {
+      // for executing bucket resizing
+      ValidationUtils.checkState(hashingChildrenNodes.containsKey(partition), 
"children nodes should be provided for clustering");
       metadata.setChildrenNodes(hashingChildrenNodes.get(partition));
     }
     return new ConsistentBucketIdentifier(metadata);
@@ -152,17 +169,13 @@ public class 
ConsistentBucketIndexBulkInsertPartitionerWithRows
    * the mapping from partition to its bucket identifier is constructed.
    */
   private Map<String, ConsistentBucketIdentifier> 
initializeBucketIdentifier(JavaRDD<Row> rows) {
+    if (hashingChildrenNodes != null) {
+      return hashingChildrenNodes.keySet().stream().collect(Collectors.toMap(p 
-> p, this::getBucketIdentifier));
+    }
     return 
rows.map(this.extractor::getPartitionPath).distinct().collect().stream()
         .collect(Collectors.toMap(p -> p, this::getBucketIdentifier));
   }
 
-  @Override
-  public void addHashingChildrenNodes(String partition, 
List<ConsistentHashingNode> nodes) {
-    ValidationUtils.checkState(nodes.stream().noneMatch(n -> n.getTag() == 
ConsistentHashingNode.NodeTag.NORMAL),
-        "children nodes should not be tagged as NORMAL");
-    hashingChildrenNodes.put(partition, nodes);
-  }
-
   private int getBucketId(Row row) {
     String recordKey = extractor.getRecordKey(row);
     String partitionPath = extractor.getPartitionPath(row);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
index 32b23751973..51c39105ce5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
@@ -32,7 +32,6 @@ import 
org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
 import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.WriteHandleFactory;
-import org.apache.hudi.table.ConsistentHashingBucketInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.Partitioner;
@@ -49,25 +48,44 @@ import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_C
 /**
  * A partitioner for (consistent hashing) bucket index used in bulk_insert
  */
-public class RDDConsistentBucketBulkInsertPartitioner<T> extends 
RDDBucketIndexPartitioner<T> implements 
ConsistentHashingBucketInsertPartitioner {
+public class RDDConsistentBucketBulkInsertPartitioner<T> extends 
RDDBucketIndexPartitioner<T> {
 
-  private final Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;
+  private Map<String/*partition*/, List<ConsistentHashingNode/*pending 
resizing related child nodes*/>> hashingChildrenNodes;
+
+  public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table) {
+    this(table, Collections.emptyMap(), false);
+    ValidationUtils.checkArgument(table.getIndex() instanceof 
HoodieSparkConsistentBucketIndex,
+        "RDDConsistentBucketPartitioner can only be used together with 
consistent hashing bucket index");
+  }
 
   public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table,
                                                   Map<String, String> 
strategyParams,
                                                   boolean 
preserveHoodieMetadata) {
+    this(table, strategyParams, preserveHoodieMetadata, null);
+  }
+
+  /**
+   * Constructor of RDDConsistentBucketBulkInsertPartitioner.
+   * @param hashingChildrenNodes children nodes for clustering, only used in 
executing clustering
+   */
+  public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table,
+                                                  Map<String, String> 
strategyParams,
+                                                  boolean 
preserveHoodieMetadata, Map<String, List<ConsistentHashingNode>> 
hashingChildrenNodes) {
     super(table,
         strategyParams.getOrDefault(PLAN_STRATEGY_SORT_COLUMNS.key(), null),
         preserveHoodieMetadata);
     
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
         "Consistent hash bucket index doesn't support CoW table");
-    this.hashingChildrenNodes = new HashMap<>();
-  }
-
-  public RDDConsistentBucketBulkInsertPartitioner(HoodieTable table) {
-    this(table, Collections.emptyMap(), false);
-    ValidationUtils.checkArgument(table.getIndex() instanceof 
HoodieSparkConsistentBucketIndex,
-        "RDDConsistentBucketPartitioner can only be used together with 
consistent hashing bucket index");
+    if (hashingChildrenNodes != null) {
+      /**
+       * Set pending consistent hashing for partition.
+       * The bulk insert will directly use the pending metadata as the 
consistent hash metadata for writing data to after-resizing buckets.
+       * NOTE: Only used in the case of executing bulk insert.
+       */
+      
ValidationUtils.checkArgument(hashingChildrenNodes.values().stream().flatMap(List::stream).noneMatch(n
 -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL),
+          "children nodes should not be tagged as NORMAL");
+      this.hashingChildrenNodes = hashingChildrenNodes;
+    }
   }
 
   /**
@@ -99,20 +117,19 @@ public class RDDConsistentBucketBulkInsertPartitioner<T> 
extends RDDBucketIndexP
     });
   }
 
-  @Override
-  public void addHashingChildrenNodes(String partition, 
List<ConsistentHashingNode> nodes) {
-    ValidationUtils.checkState(nodes.stream().noneMatch(n -> n.getTag() == 
ConsistentHashingNode.NodeTag.NORMAL), "children nodes should not be tagged as 
NORMAL");
-    hashingChildrenNodes.put(partition, nodes);
-  }
-
   /**
    * Get (construct) the bucket identifier of the given partition
    */
   private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
     HoodieSparkConsistentBucketIndex index = 
(HoodieSparkConsistentBucketIndex) table.getIndex();
     HoodieConsistentHashingMetadata metadata = 
ConsistentBucketIndexUtils.loadOrCreateMetadata(this.table, partition, 
index.getNumBuckets());
-    if (hashingChildrenNodes.containsKey(partition)) {
+    if (hashingChildrenNodes != null) {
+      // for executing bucket resizing
+      ValidationUtils.checkState(hashingChildrenNodes.containsKey(partition), 
"children nodes should be provided for clustering");
       metadata.setChildrenNodes(hashingChildrenNodes.get(partition));
+    } else {
+      // for normal bulk insert
+      ValidationUtils.checkState(hashingChildrenNodes == null, "children nodes 
should not be provided for normal bulk insert");
     }
     return new ConsistentBucketIdentifier(metadata);
   }
@@ -122,6 +139,9 @@ public class RDDConsistentBucketBulkInsertPartitioner<T> 
extends RDDBucketIndexP
    * the mapping from partition to its bucket identifier is constructed.
    */
   private Map<String, ConsistentBucketIdentifier> 
initializeBucketIdentifier(JavaRDD<HoodieRecord<T>> records) {
+    if (hashingChildrenNodes != null) {
+      return hashingChildrenNodes.keySet().stream().collect(Collectors.toMap(p 
-> p, this::getBucketIdentifier));
+    }
     return 
records.map(HoodieRecord::getPartitionPath).distinct().collect().stream()
         .collect(Collectors.toMap(p -> p, this::getBucketIdentifier));
   }

Reply via email to