This is an automated email from the ASF dual-hosted git repository. voonhous pushed a commit to branch perf-bucket-index-key-fields-parse-once in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 29a0eadb19c7ece4e3ef22335f0cab05cb57fff9 Author: voon <[email protected]> AuthorDate: Thu Jun 11 17:30:01 2026 +0800 perf(spark): Parse bucket index hash-field config once instead of per record The String overload of BucketIdentifier.getBucketId re-parses the comma-separated hash-field config on every call (split, per-token trim, empty filter, new list). Precompute the field list once per partitioner or writer with KeyGenUtils.getIndexKeyFields, the exact parser the String overload uses, and call the existing List overloads: bucket ids are bit-identical since the downstream chain is unchanged, mirroring the precedent in HoodieBucketIndex. Covers the upsert shuffle (SparkBucketIndexPartitioner), the row-writer repartition closure (BucketPartitionUtils) and the bucket bulk-insert write path (BucketBulkInsertDataInternalWriterHelper and the consistent-hashing variant). Measured on the two existing overloads (JDK 17, single-field key, 10M iterations): ~62 ns/op with the per-call parse vs ~4 ns/op with the precomputed list. --- .../commit/BucketBulkInsertDataInternalWriterHelper.java | 11 ++++++++--- .../ConsistentBucketBulkInsertDataInternalWriterHelper.java | 2 +- .../hudi/table/action/commit/SparkBucketIndexPartitioner.java | 9 ++++++--- .../scala/org/apache/spark/sql/BucketPartitionUtils.scala | 6 +++++- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java index 15d973743fd1..6a3c5dd4912c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java @@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.hudi.index.bucket.partition.NumBucketsFunction; import org.apache.hudi.io.storage.row.HoodieRowCreateHandle; +import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.HoodieTable; @@ -35,6 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -47,7 +49,9 @@ public class BucketBulkInsertDataInternalWriterHelper extends BulkInsertDataInte private Pair<UTF8String, Integer> lastFileId; // for efficient code path // p -> (fileId -> handle) private final Map<Pair<UTF8String, Integer>, HoodieRowCreateHandle> handles; - protected final String indexKeyFields; + // parsed once; the per-row write path uses the List overloads so the comma-separated config + // string is not re-split per row + protected final List<String> indexKeyFieldList; protected final int bucketNum; private final boolean isNonBlockingConcurrencyControl; private final NumBucketsFunction numBucketsFunction; @@ -62,7 +66,8 @@ public class BucketBulkInsertDataInternalWriterHelper extends BulkInsertDataInte String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, boolean populateMetaFields, boolean arePartitionRecordsSorted, boolean shouldPreserveHoodieMetadata) { super(hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, structType, populateMetaFields, arePartitionRecordsSorted, shouldPreserveHoodieMetadata); - this.indexKeyFields = writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())); + this.indexKeyFieldList = KeyGenUtils.getIndexKeyFields( + writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()))); this.bucketNum = writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS); this.handles = new HashMap<>(); this.isNonBlockingConcurrencyControl = writeConfig.isNonBlockingConcurrencyControl(); @@ -73,7 +78,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends BulkInsertDataInte try { UTF8String partitionPath = extractPartitionPath(row); UTF8String recordKey = extractRecordKey(row); - int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), indexKeyFields, numBucketsFunction.getNumBuckets(partitionPath.toString())); + int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), indexKeyFieldList, numBucketsFunction.getNumBuckets(partitionPath.toString())); if (lastFileId == null || !Objects.equals(lastFileId.getKey(), partitionPath) || !Objects.equals(lastFileId.getValue(), bucketId)) { // NOTE: It's crucial to make a copy here, since [[UTF8String]] could be pointing into // a mutable underlying buffer diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/ConsistentBucketBulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/ConsistentBucketBulkInsertDataInternalWriterHelper.java index 9072e32939d7..19c11caa1369 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/ConsistentBucketBulkInsertDataInternalWriterHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/ConsistentBucketBulkInsertDataInternalWriterHelper.java @@ -75,7 +75,7 @@ public class ConsistentBucketBulkInsertDataInternalWriterHelper extends BucketBu private HoodieRowCreateHandle getBucketRowCreateHandle(String partitionPath, String recordKey) { ConsistentBucketIdentifier identifier = getBucketIdentifier(partitionPath); - final ConsistentHashingNode node = identifier.getBucket(recordKey, indexKeyFields); + final ConsistentHashingNode node = identifier.getBucket(recordKey, indexKeyFieldList); String fileId = FSUtils.createNewFileId(node.getFileIdPrefix(), 0); ValidationUtils.checkArgument(node.getTag() != ConsistentHashingNode.NodeTag.NORMAL diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java index a0f778f17e6f..db7261eb8286 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java @@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.hudi.index.bucket.HoodieBucketIndex; +import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; @@ -52,7 +53,9 @@ public class SparkBucketIndexPartitioner<T> extends SparkHoodiePartitioner<T> { private final int numBuckets; - private final String indexKeyField; + // parsed once; the per-record getPartition path uses the List overload of getBucketId so the + // comma-separated config string is not re-split per record + private final List<String> indexKeyFieldList; private final int totalPartitionPaths; private final List<String> partitionPaths; /** @@ -80,7 +83,7 @@ public class SparkBucketIndexPartitioner<T> extends + table.getIndex().getClass().getSimpleName()); } this.numBuckets = ((HoodieBucketIndex) table.getIndex()).getNumBuckets(); - this.indexKeyField = config.getBucketIndexHashField(); + this.indexKeyFieldList = KeyGenUtils.getIndexKeyFields(config.getBucketIndexHashField()); this.totalPartitionPaths = profile.getPartitionPaths().size(); partitionPaths = new ArrayList<>(profile.getPartitionPaths()); partitionPathOffset = new HashMap<>(); @@ -129,7 +132,7 @@ public class SparkBucketIndexPartitioner<T> extends Option<HoodieRecordLocation> location = keyLocation._2; int bucketId = location.isPresent() ? BucketIdentifier.bucketIdFromFileId(location.get().getFileId()) - : BucketIdentifier.getBucketId(keyLocation._1.getRecordKey(), indexKeyField, numBuckets); + : BucketIdentifier.getBucketId(keyLocation._1.getRecordKey(), indexKeyFieldList, numBuckets); return partitionPathOffset.get(partitionPath) + bucketId; } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala index da7e8c682e4e..14aff571c238 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala @@ -25,16 +25,20 @@ import org.apache.hudi.common.util.{Functions, RemotePartitionHelper} import org.apache.hudi.common.util.hash.BucketIndexUtil import org.apache.hudi.index.bucket.BucketIdentifier import org.apache.hudi.index.bucket.partition.NumBucketsFunction +import org.apache.hudi.keygen.KeyGenUtils import org.apache.spark.Partitioner import org.apache.spark.sql.catalyst.InternalRow object BucketPartitionUtils extends SparkAdapterSupport { def createDataFrame(df: DataFrame, indexKeyFields: String, numBucketsFunction: NumBucketsFunction, partitioner: Partitioner): DataFrame = { + // parse the comma-separated config once outside the per-row closure; the list is a + // serializable java.util.List, safe to capture + val indexKeyFieldList = KeyGenUtils.getIndexKeyFields(indexKeyFields) def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => { val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD) val kb = BucketIdentifier - .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD), indexKeyFields, numBucketsFunction.getNumBuckets(partition)) + .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD), indexKeyFieldList, numBucketsFunction.getNumBuckets(partition)) if (partition == null || partition.trim.isEmpty) { ("", kb)
