This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c8431620912 [HUDI-6383] Hudi bucket index bulk insert need partition 
sort to improve speed (#8985)
c8431620912 is described below

commit c8431620912181622d152d7ba06ec6a355b420d3
Author: xuzifu666 <[email protected]>
AuthorDate: Sat Jun 17 21:48:27 2023 +0800

    [HUDI-6383] Hudi bucket index bulk insert need partition sort to improve 
speed (#8985)
    
    * Always sort the data set within each task
    * Close the release the file handle eagerly
    * the file handle #canWrite should be always true for each bucket
    
    ---------
    
    Co-authored-by: Danny Chan <[email protected]>
---
 .../BucketBulkInsertPartitionerWithRows.java       | 81 ----------------------
 .../BucketIndexBulkInsertPartitionerWithRows.java  | 49 +++++++++++++
 .../BucketBulkInsertDataInternalWriterHelper.java  | 52 +++++++-------
 .../apache/spark/sql/BucketPartitionUtils.scala    | 56 +++++++++++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  4 +-
 5 files changed, 131 insertions(+), 111 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
deleted file mode 100644
index 3efd3f61d40..00000000000
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.execution.bulkinsert;
-
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.index.bucket.BucketIdentifier;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
-import org.apache.spark.Partitioner;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-
-import scala.Tuple2;
-
-/**
- * Bulk_insert partitioner of Spark row using bucket index.
- */
-public class BucketBulkInsertPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
-
-  private final String indexKeyFields;
-  private final int bucketNum;
-
-  public BucketBulkInsertPartitionerWithRows(String indexKeyFields, int 
bucketNum) {
-    this.indexKeyFields = indexKeyFields;
-    this.bucketNum = bucketNum;
-  }
-
-  @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputPartitions) {
-    Partitioner partitioner = new Partitioner() {
-      @Override
-      public int getPartition(Object key) {
-        return (Integer) key;
-      }
-
-      @Override
-      public int numPartitions() {
-        return outputPartitions;
-      }
-    };
-
-    JavaRDD<Row> rddRows = rows.toJavaRDD()
-        .mapToPair(row -> new Tuple2<>(getPartitionKey(row, 
this.indexKeyFields, this.bucketNum, outputPartitions), row))
-        .partitionBy(partitioner)
-        .values();
-    return rows.sparkSession().createDataFrame(rddRows, rows.schema());
-  }
-
-  @Override
-  public boolean arePartitionRecordsSorted() {
-    return false;
-  }
-
-  private static int getPartitionKey(Row row, String indexKeyFields, int 
bucketNum, int partitionNum) {
-    int bucketId = 
BucketIdentifier.getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD),
 indexKeyFields, bucketNum);
-    String partition = 
row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
-    if (partition == null || partition.trim().isEmpty()) {
-      return bucketId;
-    } else {
-      int pw = (partition.hashCode() & Integer.MAX_VALUE) % partitionNum;
-      return BucketIdentifier.mod(bucketId + pw, partitionNum);
-    }
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
new file mode 100644
index 00000000000..ef32d24deb8
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.spark.sql.BucketPartitionUtils$;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+/**
+ * Bulk_insert partitioner of Spark row using bucket index.
+ */
+public class BucketIndexBulkInsertPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
+
+  private final String indexKeyFields;
+  private final int bucketNum;
+
+  public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int 
bucketNum) {
+    this.indexKeyFields = indexKeyFields;
+    this.bucketNum = bucketNum;
+  }
+
+  @Override
+  public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputPartitions) {
+    return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, 
bucketNum, outputPartitions);
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+    return true;
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
index 5c0816eb255..d6e83c7213a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.bucket.BucketIdentifier;
@@ -34,7 +35,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
 
 /**
  * Helper class for native row writer for bulk_insert with bucket index.
@@ -43,9 +43,9 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BucketBulkInsertDataInternalWriterHelper.class);
 
-  private int lastKnownBucketNum = -1;
-  // p -> (bucketNum -> handle)
-  private final Map<String, Map<Integer, HoodieRowCreateHandle>> bucketHandles;
+  private Pair<UTF8String, Integer> lastFileId; // for efficient code path
+  // p -> (fileId -> handle)
+  private final Map<String, HoodieRowCreateHandle> handles;
   private final String indexKeyFields;
   private final int bucketNum;
 
@@ -55,7 +55,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
     super(hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, 
taskEpochId, structType, populateMetaFields, arePartitionRecordsSorted);
     this.indexKeyFields = 
writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, 
writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));
     this.bucketNum = 
writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
-    this.bucketHandles = new HashMap<>();
+    this.handles = new HashMap<>();
   }
 
   public void write(InternalRow row) throws IOException {
@@ -63,18 +63,16 @@ public class BucketBulkInsertDataInternalWriterHelper 
extends BulkInsertDataInte
       UTF8String partitionPath = extractPartitionPath(row);
       UTF8String recordKey = extractRecordKey(row);
       int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), 
indexKeyFields, bucketNum);
-      if (lastKnownPartitionPath == null || 
!Objects.equals(lastKnownPartitionPath, partitionPath) || !handle.canWrite() || 
bucketId != lastKnownBucketNum) {
+      Pair<UTF8String, Integer> fileId = Pair.of(partitionPath, bucketId);
+      if (lastFileId == null || !lastFileId.equals(fileId)) {
+        LOG.info("Creating new file for partition path " + partitionPath);
         handle = getBucketRowCreateHandle(String.valueOf(partitionPath), 
bucketId);
-        // NOTE: It's crucial to make a copy here, since [[UTF8String]] could 
be pointing into
-        //       a mutable underlying buffer
-        lastKnownPartitionPath = partitionPath.clone();
-        lastKnownBucketNum = bucketId;
+        lastFileId = fileId;
       }
-
       handle.write(row);
     } catch (Throwable t) {
       LOG.error("Global error thrown while trying to write records in 
HoodieRowCreateHandle ", t);
-      throw t;
+      throw new IOException(t);
     }
   }
 
@@ -93,28 +91,26 @@ public class BucketBulkInsertDataInternalWriterHelper 
extends BulkInsertDataInte
     }
   }
 
-  protected HoodieRowCreateHandle getBucketRowCreateHandle(String 
partitionPath, int bucketId) {
-    Map<Integer, HoodieRowCreateHandle> bucketHandleMap = 
bucketHandles.computeIfAbsent(partitionPath, p -> new HashMap<>());
-    if (!bucketHandleMap.isEmpty() && bucketHandleMap.containsKey(bucketId)) {
-      return bucketHandleMap.get(bucketId);
+  protected HoodieRowCreateHandle getBucketRowCreateHandle(String fileId, int 
bucketId) throws Exception {
+    if (!handles.containsKey(fileId)) { // if there is no handle corresponding 
to the fileId
+      if (this.arePartitionRecordsSorted) {
+        // if records are sorted, we can close all existing handles
+        close();
+      }
+      HoodieRowCreateHandle rowCreateHandle = new 
HoodieRowCreateHandle(hoodieTable, writeConfig, fileId, 
getNextBucketFileId(bucketId),
+          instantTime, taskPartitionId, taskId, taskEpochId, structType, 
shouldPreserveHoodieMetadata);
+      handles.put(fileId, rowCreateHandle);
     }
-    LOG.info("Creating new file for partition path {} and bucket {}", 
partitionPath, bucketId);
-    HoodieRowCreateHandle rowCreateHandle = new 
HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextBucketFileId(bucketId),
-        instantTime, taskPartitionId, taskId, taskEpochId, structType, 
shouldPreserveHoodieMetadata);
-    bucketHandleMap.put(bucketId, rowCreateHandle);
-    return rowCreateHandle;
+    return handles.get(fileId);
   }
 
   @Override
   public void close() throws IOException {
-    for (Map<Integer, HoodieRowCreateHandle> entry : bucketHandles.values()) {
-      for (HoodieRowCreateHandle rowCreateHandle : entry.values()) {
-        LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
-        writeStatusList.add(rowCreateHandle.close());
-      }
-      entry.clear();
+    for (HoodieRowCreateHandle handle : handles.values()) {
+      LOG.info("Closing bulk insert file " + handle.getFileName());
+      writeStatusList.add(handle.close());
     }
-    bucketHandles.clear();
+    handles.clear();
     handle = null;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
new file mode 100644
index 00000000000..d5b840d7ffc
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.spark.Partitioner
+import org.apache.spark.sql.catalyst.InternalRow
+
+object BucketPartitionUtils {
+  def createDataFrame(df: DataFrame, indexKeyFields: String, bucketNum: Int, 
partitionNum: Int): DataFrame = {
+    def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => {
+      val kb = BucketIdentifier
+        .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD), 
indexKeyFields, bucketNum)
+      val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
+      if (partition == null || partition.trim.isEmpty) {
+        ("", kb)
+      } else {
+        (partition, kb)
+      }
+    }
+
+    val getPartitionKey = getPartitionKeyExtractor()
+    val partitioner = new Partitioner {
+      override def numPartitions: Int = partitionNum
+
+      override def getPartition(key: Any): Int = {
+        val t = key.asInstanceOf[(String, Int)]
+        val pw = (t._1.hashCode & Int.MaxValue) % partitionNum
+        BucketIdentifier.mod(t._2 + pw, partitionNum)
+      }
+    }
+    // use internalRow to avoid extra convert.
+    val reRdd = df.queryExecution.toRdd
+      .keyBy(row => getPartitionKey(row))
+      .repartitionAndSortWithinPartitions(partitioner)
+      .values
+    df.sparkSession.internalCreateDataFrame(reRdd, df.schema)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 39facbcadb1..5f8e1917b9b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -44,7 +44,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, 
Option => HOption}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
-import 
org.apache.hudi.execution.bulkinsert.{BucketBulkInsertPartitionerWithRows, 
BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
+import 
org.apache.hudi.execution.bulkinsert.{BucketIndexBulkInsertPartitionerWithRows, 
BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
 import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.internal.DataSourceInternalWriterHelper
@@ -809,7 +809,7 @@ object HoodieSparkSqlWriter {
 
     val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if 
(populateMetaFields) {
       if (writeConfig.getIndexType == IndexType.BUCKET) {
-        new 
BucketBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault,
 writeConfig.getBucketIndexNumBuckets)
+        new 
BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault,
 writeConfig.getBucketIndexNumBuckets)
       } else {
         val userDefinedBulkInsertPartitionerOpt = 
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
         if (userDefinedBulkInsertPartitionerOpt.isPresent) {

Reply via email to