This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c8431620912 [HUDI-6383] Hudi bucket index bulk insert need partition
sort to improve speed (#8985)
c8431620912 is described below
commit c8431620912181622d152d7ba06ec6a355b420d3
Author: xuzifu666 <[email protected]>
AuthorDate: Sat Jun 17 21:48:27 2023 +0800
[HUDI-6383] Hudi bucket index bulk insert need partition sort to improve
speed (#8985)
* Always sort the data set within each task
* Close the release the file handle eagerly
* the file handle #canWrite should be always true for each bucket
---------
Co-authored-by: Danny Chan <[email protected]>
---
.../BucketBulkInsertPartitionerWithRows.java | 81 ----------------------
.../BucketIndexBulkInsertPartitionerWithRows.java | 49 +++++++++++++
.../BucketBulkInsertDataInternalWriterHelper.java | 52 +++++++-------
.../apache/spark/sql/BucketPartitionUtils.scala | 56 +++++++++++++++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 4 +-
5 files changed, 131 insertions(+), 111 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
deleted file mode 100644
index 3efd3f61d40..00000000000
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.execution.bulkinsert;
-
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.index.bucket.BucketIdentifier;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
-import org.apache.spark.Partitioner;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-
-import scala.Tuple2;
-
-/**
- * Bulk_insert partitioner of Spark row using bucket index.
- */
-public class BucketBulkInsertPartitionerWithRows implements
BulkInsertPartitioner<Dataset<Row>> {
-
- private final String indexKeyFields;
- private final int bucketNum;
-
- public BucketBulkInsertPartitionerWithRows(String indexKeyFields, int
bucketNum) {
- this.indexKeyFields = indexKeyFields;
- this.bucketNum = bucketNum;
- }
-
- @Override
- public Dataset<Row> repartitionRecords(Dataset<Row> rows, int
outputPartitions) {
- Partitioner partitioner = new Partitioner() {
- @Override
- public int getPartition(Object key) {
- return (Integer) key;
- }
-
- @Override
- public int numPartitions() {
- return outputPartitions;
- }
- };
-
- JavaRDD<Row> rddRows = rows.toJavaRDD()
- .mapToPair(row -> new Tuple2<>(getPartitionKey(row,
this.indexKeyFields, this.bucketNum, outputPartitions), row))
- .partitionBy(partitioner)
- .values();
- return rows.sparkSession().createDataFrame(rddRows, rows.schema());
- }
-
- @Override
- public boolean arePartitionRecordsSorted() {
- return false;
- }
-
- private static int getPartitionKey(Row row, String indexKeyFields, int
bucketNum, int partitionNum) {
- int bucketId =
BucketIdentifier.getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD),
indexKeyFields, bucketNum);
- String partition =
row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
- if (partition == null || partition.trim().isEmpty()) {
- return bucketId;
- } else {
- int pw = (partition.hashCode() & Integer.MAX_VALUE) % partitionNum;
- return BucketIdentifier.mod(bucketId + pw, partitionNum);
- }
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
new file mode 100644
index 00000000000..ef32d24deb8
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.spark.sql.BucketPartitionUtils$;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+/**
+ * Bulk_insert partitioner of Spark row using bucket index.
+ */
+public class BucketIndexBulkInsertPartitionerWithRows implements
BulkInsertPartitioner<Dataset<Row>> {
+
+ private final String indexKeyFields;
+ private final int bucketNum;
+
+ public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int
bucketNum) {
+ this.indexKeyFields = indexKeyFields;
+ this.bucketNum = bucketNum;
+ }
+
+ @Override
+ public Dataset<Row> repartitionRecords(Dataset<Row> rows, int
outputPartitions) {
+ return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields,
bucketNum, outputPartitions);
+ }
+
+ @Override
+ public boolean arePartitionRecordsSorted() {
+ return true;
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
index 5c0816eb255..d6e83c7213a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.bucket.BucketIdentifier;
@@ -34,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
/**
* Helper class for native row writer for bulk_insert with bucket index.
@@ -43,9 +43,9 @@ public class BucketBulkInsertDataInternalWriterHelper extends
BulkInsertDataInte
private static final Logger LOG =
LoggerFactory.getLogger(BucketBulkInsertDataInternalWriterHelper.class);
- private int lastKnownBucketNum = -1;
- // p -> (bucketNum -> handle)
- private final Map<String, Map<Integer, HoodieRowCreateHandle>> bucketHandles;
+ private Pair<UTF8String, Integer> lastFileId; // for efficient code path
+ // p -> (fileId -> handle)
+ private final Map<String, HoodieRowCreateHandle> handles;
private final String indexKeyFields;
private final int bucketNum;
@@ -55,7 +55,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends
BulkInsertDataInte
super(hoodieTable, writeConfig, instantTime, taskPartitionId, taskId,
taskEpochId, structType, populateMetaFields, arePartitionRecordsSorted);
this.indexKeyFields =
writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD,
writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));
this.bucketNum =
writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
- this.bucketHandles = new HashMap<>();
+ this.handles = new HashMap<>();
}
public void write(InternalRow row) throws IOException {
@@ -63,18 +63,16 @@ public class BucketBulkInsertDataInternalWriterHelper
extends BulkInsertDataInte
UTF8String partitionPath = extractPartitionPath(row);
UTF8String recordKey = extractRecordKey(row);
int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey),
indexKeyFields, bucketNum);
- if (lastKnownPartitionPath == null ||
!Objects.equals(lastKnownPartitionPath, partitionPath) || !handle.canWrite() ||
bucketId != lastKnownBucketNum) {
+ Pair<UTF8String, Integer> fileId = Pair.of(partitionPath, bucketId);
+ if (lastFileId == null || !lastFileId.equals(fileId)) {
+ LOG.info("Creating new file for partition path " + partitionPath);
handle = getBucketRowCreateHandle(String.valueOf(partitionPath),
bucketId);
- // NOTE: It's crucial to make a copy here, since [[UTF8String]] could
be pointing into
- // a mutable underlying buffer
- lastKnownPartitionPath = partitionPath.clone();
- lastKnownBucketNum = bucketId;
+ lastFileId = fileId;
}
-
handle.write(row);
} catch (Throwable t) {
LOG.error("Global error thrown while trying to write records in
HoodieRowCreateHandle ", t);
- throw t;
+ throw new IOException(t);
}
}
@@ -93,28 +91,26 @@ public class BucketBulkInsertDataInternalWriterHelper
extends BulkInsertDataInte
}
}
- protected HoodieRowCreateHandle getBucketRowCreateHandle(String
partitionPath, int bucketId) {
- Map<Integer, HoodieRowCreateHandle> bucketHandleMap =
bucketHandles.computeIfAbsent(partitionPath, p -> new HashMap<>());
- if (!bucketHandleMap.isEmpty() && bucketHandleMap.containsKey(bucketId)) {
- return bucketHandleMap.get(bucketId);
+ protected HoodieRowCreateHandle getBucketRowCreateHandle(String fileId, int
bucketId) throws Exception {
+ if (!handles.containsKey(fileId)) { // if there is no handle corresponding
to the fileId
+ if (this.arePartitionRecordsSorted) {
+ // if records are sorted, we can close all existing handles
+ close();
+ }
+ HoodieRowCreateHandle rowCreateHandle = new
HoodieRowCreateHandle(hoodieTable, writeConfig, fileId,
getNextBucketFileId(bucketId),
+ instantTime, taskPartitionId, taskId, taskEpochId, structType,
shouldPreserveHoodieMetadata);
+ handles.put(fileId, rowCreateHandle);
}
- LOG.info("Creating new file for partition path {} and bucket {}",
partitionPath, bucketId);
- HoodieRowCreateHandle rowCreateHandle = new
HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextBucketFileId(bucketId),
- instantTime, taskPartitionId, taskId, taskEpochId, structType,
shouldPreserveHoodieMetadata);
- bucketHandleMap.put(bucketId, rowCreateHandle);
- return rowCreateHandle;
+ return handles.get(fileId);
}
@Override
public void close() throws IOException {
- for (Map<Integer, HoodieRowCreateHandle> entry : bucketHandles.values()) {
- for (HoodieRowCreateHandle rowCreateHandle : entry.values()) {
- LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
- writeStatusList.add(rowCreateHandle.close());
- }
- entry.clear();
+ for (HoodieRowCreateHandle handle : handles.values()) {
+ LOG.info("Closing bulk insert file " + handle.getFileName());
+ writeStatusList.add(handle.close());
}
- bucketHandles.clear();
+ handles.clear();
handle = null;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
new file mode 100644
index 00000000000..d5b840d7ffc
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.spark.Partitioner
+import org.apache.spark.sql.catalyst.InternalRow
+
+object BucketPartitionUtils {
+ def createDataFrame(df: DataFrame, indexKeyFields: String, bucketNum: Int,
partitionNum: Int): DataFrame = {
+ def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => {
+ val kb = BucketIdentifier
+ .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD),
indexKeyFields, bucketNum)
+ val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
+ if (partition == null || partition.trim.isEmpty) {
+ ("", kb)
+ } else {
+ (partition, kb)
+ }
+ }
+
+ val getPartitionKey = getPartitionKeyExtractor()
+ val partitioner = new Partitioner {
+ override def numPartitions: Int = partitionNum
+
+ override def getPartition(key: Any): Int = {
+ val t = key.asInstanceOf[(String, Int)]
+ val pw = (t._1.hashCode & Int.MaxValue) % partitionNum
+ BucketIdentifier.mod(t._2 + pw, partitionNum)
+ }
+ }
+ // use internalRow to avoid extra convert.
+ val reRdd = df.queryExecution.toRdd
+ .keyBy(row => getPartitionKey(row))
+ .repartitionAndSortWithinPartitions(partitioner)
+ .values
+ df.sparkSession.internalCreateDataFrame(reRdd, df.schema)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 39facbcadb1..5f8e1917b9b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -44,7 +44,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils,
Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
-import
org.apache.hudi.execution.bulkinsert.{BucketBulkInsertPartitionerWithRows,
BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
+import
org.apache.hudi.execution.bulkinsert.{BucketIndexBulkInsertPartitionerWithRows,
BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.internal.DataSourceInternalWriterHelper
@@ -809,7 +809,7 @@ object HoodieSparkSqlWriter {
val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if
(populateMetaFields) {
if (writeConfig.getIndexType == IndexType.BUCKET) {
- new
BucketBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault,
writeConfig.getBucketIndexNumBuckets)
+ new
BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault,
writeConfig.getBucketIndexNumBuckets)
} else {
val userDefinedBulkInsertPartitionerOpt =
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
if (userDefinedBulkInsertPartitionerOpt.isPresent) {