This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch HUDI-8990 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 97bf409a5ad2cb939bc941f2b94432960c17a945 Author: YueZhang <[email protected]> AuthorDate: Wed Mar 19 20:28:17 2025 +0800 finish Flink related coding and UT && Spark partitioner and index without test --- .../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++ .../hudi/index/bucket/HoodieSimpleBucketIndex.java | 4 ++ .../bucket/PartitionBucketIndexCalculator.java | 7 +- .../index/bucket/PartitionBucketIndexUtils.java | 34 +++++---- .../BucketIndexBulkInsertPartitionerWithRows.java | 14 +++- .../RDDSimpleBucketBulkInsertPartitioner.java | 14 +++- .../BucketBulkInsertDataInternalWriterHelper.java | 11 ++- .../action/commit/SparkBucketIndexPartitioner.java | 82 +++++++++++++++++++--- .../apache/spark/sql/BucketPartitionUtils.scala | 21 ++++-- .../hudi/configuration/OptionsInference.java | 2 +- .../sink/bucket/BucketBulkInsertWriterHelper.java | 11 +-- .../sink/bucket/BucketStreamWriteFunction.java | 2 - .../sink/partitioner/BucketIndexPartitioner.java | 12 ++-- .../java/org/apache/hudi/sink/utils/Pipelines.java | 16 +++-- .../hudi/sink/utils/BulkInsertFunctionWrapper.java | 3 +- .../apache/hudi/table/ITTestHoodieDataSource.java | 78 ++++++++++++++++++++ .../org/apache/hudi/utils/TestConfigurations.java | 3 + .../BaseDatasetBulkInsertCommitActionExecutor.java | 2 +- 18 files changed, 259 insertions(+), 61 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 959c94242b1..557f8672e19 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1955,6 +1955,10 @@ public class HoodieWriteConfig extends HoodieConfig { return HoodieIndex.IndexType.valueOf(getString(HoodieIndexConfig.INDEX_TYPE)); } + public String getHashingConfigInstantToLoad() { + return getString(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT); + } + public String getIndexClass() { return getString(HoodieIndexConfig.INDEX_CLASS_NAME); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java index 9901ea0b257..e19e80b82a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java @@ -140,6 +140,10 @@ public class HoodieSimpleBucketIndex extends HoodieBucketIndex { return BucketIdentifier.getBucketId(key.getRecordKey(), indexKeyFields, numBuckets); } + public int getBucketID(HoodieKey key, int numBuckets) { + return BucketIdentifier.getBucketId(key.getRecordKey(), indexKeyFields, numBuckets); + } + @Override public boolean canIndexLogFiles() { return false; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java index ff8af59a73d..65a08363402 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexCalculator.java @@ -69,12 +69,7 @@ public class PartitionBucketIndexCalculator implements Serializable { */ private PartitionBucketIndexCalculator(String instantToLoad, Configuration hadoopConf, String basePath) { this.instantToLoad = instantToLoad; - StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); - StoragePath hashingBase = new StoragePath(metaPath, HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER); - StoragePath hashingConfigPath = - new StoragePath(hashingBase, - instantToLoad + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); - + StoragePath hashingConfigPath = PartitionBucketIndexUtils.getHashingConfigPath(basePath, instantToLoad); try (HoodieHadoopStorage storage = new HoodieHadoopStorage(hashingConfigPath, HadoopFSUtils.getStorageConf(hadoopConf))) { init(storage, hashingConfigPath); } catch (IOException e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java index 82b61acd7c7..69e29bd92e3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java @@ -49,8 +49,8 @@ public class PartitionBucketIndexUtils { private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexUtils.class); - public static boolean isPartitionSimpleBucketIndex(Configuration conf) { - StoragePath storagePath = new StoragePath(HoodieTableMetaClient.PARTITION_BUCKET_INDEX_HASHING_FOLDER); + public static boolean isPartitionSimpleBucketIndex(Configuration conf, String basePath) { + StoragePath storagePath = getHashingConfigStorageFolder(basePath); try (HoodieHadoopStorage storage = new HoodieHadoopStorage(storagePath, HadoopFSUtils.getStorageConf(conf))) { return storage.exists(storagePath); } catch (IOException e) { @@ -58,6 +58,16 @@ public class PartitionBucketIndexUtils { } } + public static StoragePath getHashingConfigStorageFolder(String basePath) { + StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + return new StoragePath(metaPath, HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER); + } + + public static StoragePath getHashingConfigPath(String basePath, String instantToLoad) { + StoragePath hashingBase = getHashingConfigStorageFolder(basePath); + return new StoragePath(hashingBase, instantToLoad + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + } + public static boolean initHashingConfig(HoodieTableMetaClient metaClient, String expressions, String rule, @@ -104,7 +114,6 @@ public class PartitionBucketIndexUtils { Option<HoodieInstant> earliestInstant = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().firstInstant(); String instantToLoad = ""; try { - // 按时间降序排列 List<String> hashingConfigInstants = metaClient.getStorage() .listDirectEntries(new StoragePath(metaClient.getHashingMetadataConfigPath())).stream().map(info -> { String instant = getHashingConfigInstant(info.getPath().getName()); @@ -115,11 +124,15 @@ public class PartitionBucketIndexUtils { }).sorted().collect(Collectors.toList()); for (String instant : hashingConfigInstants) { - if (instants.contains(instant)) { + if (!earliestInstant.isPresent()) { instantToLoad = instant; break; - } else if (earliestInstant.isPresent() && instant.compareTo(earliestInstant.get().requestedTime()) < 0){ + } else if (instants.contains(instant)) { instantToLoad = instant; + break; + } else if (instant.compareTo(earliestInstant.get().requestedTime()) < 0){ + instantToLoad = instant; + break; } } @@ -133,16 +146,11 @@ public class PartitionBucketIndexUtils { } } - public static String getHashingConfigInstant(String hashingConfig) { - int lastIndex = hashingConfig.lastIndexOf('/'); - if (lastIndex == -1) { - return null; - } - String fileName = hashingConfig.substring(lastIndex + 1); - int dotIndex = fileName.indexOf('.'); + public static String getHashingConfigInstant(String hashingConfigName) { + int dotIndex = hashingConfigName.indexOf('.'); if (dotIndex == -1) { return null; } - return fileName.substring(0, dotIndex); + return hashingConfigName.substring(0, dotIndex); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java index ef32d24deb8..e80c8bfac0e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java @@ -18,7 +18,10 @@ package org.apache.hudi.execution.bulkinsert; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator; import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieTable; import org.apache.spark.sql.BucketPartitionUtils$; import org.apache.spark.sql.Dataset; @@ -31,15 +34,22 @@ public class BucketIndexBulkInsertPartitionerWithRows implements BulkInsertParti private final String indexKeyFields; private final int bucketNum; + private boolean isPartitionBucketIndexEnable = false; + private PartitionBucketIndexCalculator calc; - public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int bucketNum) { + public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, int bucketNum, HoodieTable table) { this.indexKeyFields = indexKeyFields; this.bucketNum = bucketNum; + String hashingInstantToLoad = table.getConfig().getHashingConfigInstantToLoad(); + this.isPartitionBucketIndexEnable = StringUtils.isNullOrEmpty(hashingInstantToLoad); + if (isPartitionBucketIndexEnable) { + calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, table.getMetaClient()); + } } @Override public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputPartitions) { - return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, bucketNum, outputPartitions); + return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, bucketNum, outputPartitions, calc, isPartitionBucketIndexEnable); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java index 8304c1031cb..d651342c368 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java @@ -23,9 +23,11 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex; +import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator; import org.apache.hudi.table.HoodieTable; import org.apache.spark.Partitioner; @@ -39,11 +41,18 @@ import java.util.stream.Collectors; public class RDDSimpleBucketBulkInsertPartitioner<T extends HoodieRecordPayload> extends RDDBucketIndexPartitioner<T> { private final boolean isNonBlockingConcurrencyControl; + private boolean isPartitionBucketIndexEnable = false; + private PartitionBucketIndexCalculator calc; public RDDSimpleBucketBulkInsertPartitioner(HoodieTable table) { super(table, null, false); ValidationUtils.checkArgument(table.getIndex() instanceof HoodieSimpleBucketIndex); this.isNonBlockingConcurrencyControl = table.getConfig().isNonBlockingConcurrencyControl(); + String hashingInstantToLoad = table.getConfig().getHashingConfigInstantToLoad(); + this.isPartitionBucketIndexEnable = StringUtils.isNullOrEmpty(hashingInstantToLoad); + if (isPartitionBucketIndexEnable) { + calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, table.getMetaClient()); + } } @Override @@ -64,7 +73,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends HoodieRecordPayload> public int getPartition(Object key) { HoodieKey hoodieKey = (HoodieKey) key; String partitionPath = hoodieKey.getPartitionPath(); - int bucketID = index.getBucketID(hoodieKey); + int bucketID = isPartitionBucketIndexEnable ? index.getBucketID(hoodieKey, calc.computeNumBuckets(partitionPath)) : index.getBucketID(hoodieKey); String fileID = partitionMapper.get(partitionPath).get(bucketID); return fileIdPrefixToBucketIndex.get(fileID); } @@ -75,11 +84,11 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends HoodieRecordPayload> Map<String, Integer> fileIdPrefixToBucketIndex) { HoodieSimpleBucketIndex index = (HoodieSimpleBucketIndex) table.getIndex(); - int numBuckets = index.getNumBuckets(); return records .map(HoodieRecord::getPartitionPath) .distinct().collect().stream() .collect(Collectors.toMap(p -> p, p -> { + // p ==> partition path Map<Integer, HoodieRecordLocation> locationMap = index.loadBucketIdToFileIdMappingForPartition(table, p); Map<Integer, String> bucketIdToFileIdPrefixMap = new HashMap<>(); HashSet<Integer> existsBucketID = new HashSet<>(); @@ -94,6 +103,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends HoodieRecordPayload> doAppend.add(true); }); + int numBuckets = isPartitionBucketIndexEnable ? calc.computeNumBuckets(p) : index.getNumBuckets(); // Generate a file that does not exist for (int i = 0; i < numBuckets; i++) { if (!existsBucketID.contains(i)) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java index 68123c02b79..d2b6a04c2e1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java @@ -19,10 +19,12 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator; import org.apache.hudi.io.storage.row.HoodieRowCreateHandle; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.HoodieTable; @@ -44,6 +46,8 @@ import java.util.Objects; public class BucketBulkInsertDataInternalWriterHelper extends BulkInsertDataInternalWriterHelper { private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertDataInternalWriterHelper.class); + private final boolean isPartitionBucketIndexEnable; + private PartitionBucketIndexCalculator calc; private Pair<UTF8String, Integer> lastFileId; // for efficient code path // p -> (fileId -> handle) @@ -66,13 +70,18 @@ public class BucketBulkInsertDataInternalWriterHelper extends BulkInsertDataInte this.bucketNum = writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS); this.handles = new HashMap<>(); this.isNonBlockingConcurrencyControl = writeConfig.isNonBlockingConcurrencyControl(); + String hashingInstantToLoad = writeConfig.getHashingConfigInstantToLoad(); + this.isPartitionBucketIndexEnable = StringUtils.isNullOrEmpty(hashingInstantToLoad); + if (isPartitionBucketIndexEnable) { + calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, hoodieTable.getMetaClient()); + } } public void write(InternalRow row) throws IOException { try { UTF8String partitionPath = extractPartitionPath(row); UTF8String recordKey = extractRecordKey(row); - int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), indexKeyFields, bucketNum); + int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), indexKeyFields, isPartitionBucketIndexEnable ? calc.computeNumBuckets(partitionPath.toString()) : bucketNum); if (lastFileId == null || !Objects.equals(lastFileId.getKey(), partitionPath) || !Objects.equals(lastFileId.getValue(), bucketId)) { // NOTE: It's crucial to make a copy here, since [[UTF8String]] could be pointing into // a mutable underlying buffer diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java index b7cdef72ab1..c760d2aabdd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java @@ -23,12 +23,14 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.hudi.index.bucket.HoodieBucketIndex; +import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; @@ -49,11 +51,17 @@ import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE_T /** * Packs incoming records to be inserted into buckets (1 bucket = 1 RDD partition). + * + * For Partition Level bucket index + * Creates direct lookup arrays during initialization, Provides O(1) lookup time for partition path and bucket ID, + * The trade-off is a small amount of additional memory usage for the lookup arrays. */ -public class SparkBucketIndexPartitioner<T> extends - SparkHoodiePartitioner<T> { +public class SparkBucketIndexPartitioner<T> extends SparkHoodiePartitioner<T> { - private final int numBuckets; + private final int totalPartitions; + private boolean isPartitionBucketIndexEnable = false; + private PartitionBucketIndexCalculator calc; + private int numBuckets; private final String indexKeyField; private final int totalPartitionPaths; private final List<String> partitionPaths; @@ -70,6 +78,15 @@ public class SparkBucketIndexPartitioner<T> extends private Map<String, Set<String>> updatePartitionPathFileIds; private final boolean isNonBlockingConcurrencyControl; + /** + * Direct mapping from partition number to partition path + */ + private String[] partitionNumberToPath; + + /** + * Direct mapping from partition number to local bucket ID + */ + private Integer[] partitionNumberToLocalBucketId; public SparkBucketIndexPartitioner(WorkloadProfile profile, HoodieEngineContext context, @@ -81,6 +98,11 @@ public class SparkBucketIndexPartitioner<T> extends " Bucket index partitioner should only be used by BucketIndex other than " + table.getIndex().getClass().getSimpleName()); } + String hashingInstantToLoad = table.getConfig().getHashingConfigInstantToLoad(); + this.isPartitionBucketIndexEnable = StringUtils.isNullOrEmpty(hashingInstantToLoad); + if (isPartitionBucketIndexEnable) { + calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, table.getMetaClient()); + } this.numBuckets = ((HoodieBucketIndex) table.getIndex()).getNumBuckets(); this.indexKeyField = config.getBucketIndexHashField(); this.totalPartitionPaths = profile.getPartitionPaths().size(); @@ -89,12 +111,33 @@ public class SparkBucketIndexPartitioner<T> extends int i = 0; for (Object partitionPath : profile.getPartitionPaths()) { partitionPathOffset.put(partitionPath.toString(), i); - i += numBuckets; + if (isPartitionBucketIndexEnable) { + i += calc.computeNumBuckets(partitionPath.toString()); + } else { + i += numBuckets; + } } + this.totalPartitions = i; assignUpdates(profile); WriteOperationType operationType = profile.getOperationType(); this.isOverwrite = INSERT_OVERWRITE.equals(operationType) || INSERT_OVERWRITE_TABLE.equals(operationType); this.isNonBlockingConcurrencyControl = config.isNonBlockingConcurrencyControl(); + + if (isPartitionBucketIndexEnable) { + this.partitionNumberToPath = new String[totalPartitions]; + this.partitionNumberToLocalBucketId = new Integer[totalPartitions]; + + for (String partitionPath : partitionPaths) { + int offset = partitionPathOffset.get(partitionPath); + int numBuckets = calc.computeNumBuckets(partitionPath); + + for (int j = 0; j < numBuckets; j++) { + int partitionNumber = offset + j; + partitionNumberToPath[partitionNumber] = partitionPath; + partitionNumberToLocalBucketId[partitionNumber] = j; + } + } + } } private void assignUpdates(WorkloadProfile profile) { @@ -115,33 +158,49 @@ public class SparkBucketIndexPartitioner<T> extends @Override public BucketInfo getBucketInfo(int bucketNumber) { - String partitionPath = partitionPaths.get(bucketNumber / numBuckets); + Pair<Integer, String> res = computeBucketAndPartitionPath(bucketNumber); + int bucket = res.getLeft(); + String partitionPath = res.getRight(); // Insert overwrite always generates new bucket file id if (isOverwrite) { ValidationUtils.checkArgument(!isNonBlockingConcurrencyControl, "Insert overwrite is not supported with non-blocking concurrency control"); - return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucketNumber % numBuckets), partitionPath); + return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucket), partitionPath); } Option<String> fileIdOption = Option.fromJavaOptional(updatePartitionPathFileIds .getOrDefault(partitionPath, Collections.emptySet()).stream() - .filter(e -> e.startsWith(BucketIdentifier.bucketIdStr(bucketNumber % numBuckets))) + .filter(e -> e.startsWith(BucketIdentifier.bucketIdStr(bucket))) .findFirst()); if (fileIdOption.isPresent()) { return new BucketInfo(BucketType.UPDATE, fileIdOption.get(), partitionPath); } else { // Always write into log file instead of base file if using NB-CC if (isNonBlockingConcurrencyControl) { - String fileId = BucketIdentifier.newBucketFileIdForNBCC(bucketNumber % numBuckets); + String fileId = BucketIdentifier.newBucketFileIdForNBCC(bucket); return new BucketInfo(BucketType.UPDATE, fileId, partitionPath); } - String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketNumber % numBuckets); + String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucket); return new BucketInfo(BucketType.INSERT, fileIdPrefix, partitionPath); } } + private Pair<Integer, String> computeBucketAndPartitionPath(int bucketNumber) { + Integer bucket; + String partitionPath; + if (isPartitionBucketIndexEnable) { + bucket = partitionNumberToLocalBucketId[bucketNumber]; + partitionPath = partitionNumberToPath[bucketNumber]; + ValidationUtils.checkArgument(bucket != null && partitionPath != null); + } else { + bucket = bucketNumber % numBuckets; + partitionPath = partitionPaths.get(bucketNumber / numBuckets); + } + return Pair.of(bucket, partitionPath); + } + @Override public int numPartitions() { - return totalPartitionPaths * numBuckets; + return totalPartitions; } @Override @@ -151,7 +210,8 @@ public class SparkBucketIndexPartitioner<T> extends Option<HoodieRecordLocation> location = keyLocation._2; int bucketId = location.isPresent() ? BucketIdentifier.bucketIdFromFileId(location.get().getFileId()) - : BucketIdentifier.getBucketId(keyLocation._1.getRecordKey(), indexKeyField, numBuckets); + : BucketIdentifier.getBucketId(keyLocation._1.getRecordKey(), indexKeyField, + isPartitionBucketIndexEnable ? calc.computeNumBuckets(partitionPath) : numBuckets); return partitionPathOffset.get(partitionPath) + bucketId; } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala index 740e6799348..464438671c1 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala @@ -21,16 +21,27 @@ package org.apache.spark.sql import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.util.Functions import org.apache.hudi.common.util.hash.BucketIndexUtil -import org.apache.hudi.index.bucket.BucketIdentifier +import org.apache.hudi.index.bucket.{BucketIdentifier, PartitionBucketIndexCalculator} import org.apache.spark.Partitioner import org.apache.spark.sql.catalyst.InternalRow object BucketPartitionUtils { - def createDataFrame(df: DataFrame, indexKeyFields: String, bucketNum: Int, partitionNum: Int): DataFrame = { + def createDataFrame(df: DataFrame, indexKeyFields: String, bucketNum: Int, partitionNum: Int, + calc: PartitionBucketIndexCalculator, isPartitionBucketIndexEnable: Boolean): DataFrame = { + + def computeBucketNumber(): String => Integer = partition => { + val bucketNumber = if (isPartitionBucketIndexEnable) { + calc.computeNumBuckets(partition) + } else { + bucketNum + } + bucketNumber + } + def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => { - val kb = BucketIdentifier - .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD), indexKeyFields, bucketNum) val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD) + val kb = BucketIdentifier + .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD), indexKeyFields, computeBucketNumber().apply(partition)) if (partition == null || partition.trim.isEmpty) { ("", kb) } else { @@ -48,7 +59,7 @@ object BucketPartitionUtils { override def getPartition(value: Any): Int = { val partitionKeyPair = value.asInstanceOf[(String, Int)] - partitionIndexFunc.apply(bucketNum, partitionKeyPair._1, partitionKeyPair._2) + partitionIndexFunc.apply(computeBucketNumber().apply(partitionKeyPair._1), partitionKeyPair._1, partitionKeyPair._2) } } // use internalRow to avoid extra convert. diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java index 66d2e08417e..e2e2e6c2d55 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java @@ -99,7 +99,7 @@ public class OptionsInference { public static void setupIndexConfigs(Configuration conf) { HoodieIndex.BucketIndexEngineType engineType = OptionsResolver.getBucketEngineType(conf); if (engineType.equals(HoodieIndex.BucketIndexEngineType.SIMPLE) && - PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(HadoopConfigurations.getHadoopConf(conf))) { + PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(HadoopConfigurations.getHadoopConf(conf), conf.get(FlinkOptions.PATH))) { try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClientV2(conf)) { HoodieTableMetaClient metaClient = writeClient.getHoodieTable().getMetaClient(); String hashingConfigToLoad = PartitionBucketIndexUtils.getHashingConfigInstantToLoad(metaClient); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java index 8369a0b768e..c3fc9a2e1af 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java @@ -27,6 +27,7 @@ import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle; import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; import org.apache.hudi.sink.bulk.RowDataKeyGen; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.util.Lazy; @@ -97,13 +98,13 @@ public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper { } private static String getFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, Configuration conf, boolean needFixedFileIdSuffix, - Lazy<org.apache.hadoop.conf.Configuration> hadoopConf) { + StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf) { String recordKey = keyGen.getRecordKey(record); String partition = keyGen.getPartitionPath(record); final int numBuckets; if (!StringUtils.isNullOrEmpty(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT))) { - numBuckets = PartitionBucketIndexCalculator.getInstance(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT), hadoopConf.get(), conf.get(FlinkOptions.PATH)) - .computeNumBuckets(partition); + numBuckets = PartitionBucketIndexCalculator.getInstance(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT), + storageConf.unwrapAs(org.apache.hadoop.conf.Configuration.class), conf.get(FlinkOptions.PATH)).computeNumBuckets(partition); } else { numBuckets = conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); } @@ -113,8 +114,8 @@ public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper { } public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, Configuration conf, boolean needFixedFileIdSuffix, - Lazy<org.apache.hadoop.conf.Configuration> hadoopConf) { - final String fileId = getFileId(bucketIdToFileId, keyGen, record, indexKeys, conf, needFixedFileIdSuffix, hadoopConf); + StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf) { + final String fileId = getFileId(bucketIdToFileId, keyGen, record, indexKeys, conf, needFixedFileIdSuffix, storageConf); return GenericRowData.of(StringData.fromString(fileId), record); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index 2a3e1e154dd..9cc23e1d0e0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -139,9 +139,7 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { bootstrapIndexIfNeed(partition); } Map<Integer, String> bucketToFileId = bucketIndex.computeIfAbsent(partition, p -> new HashMap<>()); - int numBuckets = getOrComputeNumBuckets(partition); - final int bucketNum = BucketIdentifier.getBucketId(record.getRecordKey(), indexKeyFields, numBuckets); final String bucketId = partition + "/" + bucketNum; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java index ecf4656d7ee..ca7cf4023c5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.hash.BucketIndexUtil; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.hudi.index.bucket.PartitionBucketIndexCalculator; -import org.apache.hudi.util.Lazy; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.configuration.Configuration; @@ -41,17 +41,16 @@ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner< private final Configuration conf; private final String indexKeyFields; private final String hashingInstantToLoad; - private org.apache.hadoop.conf.Configuration hadoopConf; + private StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf; private Functions.Function3<Integer, String, Integer, Integer> partitionIndexFunc; - public BucketIndexPartitioner(Configuration conf, String indexKeyFields, Lazy<org.apache.hadoop.conf.Configuration> hadoopConf) { + public BucketIndexPartitioner(Configuration conf, String indexKeyFields, + StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf) { this.conf = conf; this.indexKeyFields = indexKeyFields; this.hashingInstantToLoad = conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT); - if (!StringUtils.isNullOrEmpty(hashingInstantToLoad)) { - this.hadoopConf = hadoopConf.get(); - } + this.storageConf = storageConf; } @Override @@ -62,6 +61,7 @@ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner< int bucketNum; if (!StringUtils.isNullOrEmpty(hashingInstantToLoad)) { + org.apache.hadoop.conf.Configuration hadoopConf = storageConf.unwrapAs(org.apache.hadoop.conf.Configuration.class); PartitionBucketIndexCalculator calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, hadoopConf, conf.get(FlinkOptions.PATH)); bucketNum = calc.computeNumBuckets(key.getPartitionPath()); } else { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index d604567246d..d278d807064 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -53,8 +53,9 @@ import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketIndexPartitioner; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.table.format.FilePathUtils; -import org.apache.hudi.util.Lazy; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -122,8 +123,9 @@ public class Pipelines { "Consistent hashing bucket index does not work with bulk insert using FLINK engine. Use simple bucket index or Spark engine."); } String indexKeys = OptionsResolver.getIndexKeyField(conf); - Lazy<org.apache.hadoop.conf.Configuration> hadoopConf = Lazy.lazily(() -> HadoopConfigurations.getHadoopConf(conf)); - BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(conf, indexKeys, hadoopConf); + StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf = StringUtils.isNullOrEmpty(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT)) ? + null : new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)); + BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(conf, indexKeys, storageConf); RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType); RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType); InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId); @@ -131,7 +133,7 @@ public class Pipelines { Map<String, String> bucketIdToFileId = new HashMap<>(); dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey) - .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, conf, needFixedFileIdSuffix, hadoopConf), typeInfo) + .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, conf, needFixedFileIdSuffix, storageConf), typeInfo) .setParallelism(PARALLELISM_VALUE); if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId); @@ -341,7 +343,9 @@ public class Pipelines { String indexKeyFields = OptionsResolver.getIndexKeyField(conf); // [HUDI-9036] BucketIndexPartitioner is also used in bulk insert mode, // keep use of HoodieKey here in partitionCustom for now - BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(conf, indexKeyFields, Lazy.lazily(() -> HadoopConfigurations.getHadoopConf(conf))); + StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf = StringUtils.isNullOrEmpty(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_LOAD_INSTANT)) ? + null : new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)); + BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(conf, indexKeyFields, storageConf); return dataStream .partitionCustom( partitioner, @@ -505,6 +509,8 @@ public class Pipelines { : databaseName + "." + conf.getString(FlinkOptions.TABLE_NAME); } + + /** * Dummy sink that does nothing. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java index a4da9049dce..eb6a8e3c25d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java @@ -32,6 +32,7 @@ import org.apache.hudi.sink.bulk.sort.SortOperator; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; import org.apache.hudi.sink.common.AbstractWriteFunction; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.Lazy; import org.apache.hudi.util.StreamerUtil; @@ -221,7 +222,7 @@ public class BulkInsertFunctionWrapper<I> implements TestFunctionWrapper<I> { boolean needFixedFileIdSuffix = OptionsResolver.isNonBlockingConcurrencyControl(conf); this.bucketIdToFileId = new HashMap<>(); this.mapFunction = r -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, r, indexKeys, conf, needFixedFileIdSuffix, - Lazy.lazily(() -> HadoopConfigurations.getHadoopConf(conf))); + new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf))); } private void setupSortOperator() throws Exception { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index a2a31b492e9..17ff21c3938 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -23,13 +23,16 @@ import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.table.catalog.HoodieCatalogTestUtils; import org.apache.hudi.table.catalog.HoodieHiveCatalog; import org.apache.hudi.util.StreamerUtil; @@ -66,10 +69,12 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.File; +import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -89,6 +94,7 @@ import static org.apache.hudi.utils.TestData.assertRowsEquals; import static org.apache.hudi.utils.TestData.assertRowsEqualsUnordered; import static org.apache.hudi.utils.TestData.map; import static org.apache.hudi.utils.TestData.row; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1258,6 +1264,78 @@ public class ITTestHoodieDataSource { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @ParameterizedTest + @ValueSource(strings = {"bulk_insert", "upsert"}) + void testBulkInsertWithPartitionBucketIndex(String operationType) throws IOException { + TableEnvironment tableEnv = batchTableEnv; + // csv source + String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"); + tableEnv.executeSql(csvSourceDDL); + String catalogName = "hudi_" + operationType; + String hudiCatalogDDL = catalog(catalogName) + .catalogPath(tempFile.getAbsolutePath()) + .end(); + + tableEnv.executeSql(hudiCatalogDDL); + String dbName = "hudi"; + tableEnv.executeSql("create database " + catalogName + "." + dbName); + String basePath = tempFile.getAbsolutePath() + "/hudi/hoodie_sink"; + + String hoodieTableDDL = sql(catalogName + ".hudi.hoodie_sink") + .option(FlinkOptions.PATH, basePath) + .option(FlinkOptions.OPERATION, operationType) + .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true) + .option(FlinkOptions.INDEX_TYPE, "BUCKET") + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, "true") + .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, "1") + .option(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, "regex") + .option(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, "partition=(par1|par2),2") + .end(); + tableEnv.executeSql(hoodieTableDDL); + + String insertInto = "insert into " + catalogName + ".hudi.hoodie_sink select * from csv_source"; + execInsertSql(tableEnv, insertInto); + + List<Row> result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from " + catalogName + ".hudi.hoodie_sink").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + // apply filters + List<Row> result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from " + catalogName + ".hudi.hoodie_sink where uuid > 'id5'").execute().collect()); + assertRowsEquals(result2, "[" + + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, new org.apache.hadoop.conf.Configuration()); + + List<StoragePathInfo> allFiles = metaClient.getStorage().listDirectEntries(new StoragePath(basePath)).stream().flatMap(path -> { + try { + return metaClient.getStorage().listDirectEntries(path.getPath()).stream(); + } catch (IOException e) { + return Stream.empty(); + } + }).collect(Collectors.toList()); + + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), allFiles); + List<String> actual = fsView.getAllFileGroups().map(group -> { + return group.getPartitionPath() + group.getFileGroupId().getFileId().split("-")[0]; + }).collect(Collectors.toList()); + + // based on expression partition=(par1|par2),2 and default bucket number 1 + // par1 and par2 have two buckets. + // par3 and par4 have one bucket. + ArrayList<String> expected = new ArrayList<>(); + expected.add("partition=par1" + "00000000"); + expected.add("partition=par1" + "00000001"); + expected.add("partition=par2" + "00000000"); + expected.add("partition=par2" + "00000001"); + expected.add("partition=par3" + "00000000"); + expected.add("partition=par4" + "00000000"); + + assertEquals(expected.stream().sorted().collect(Collectors.toList()), actual.stream().sorted().collect(Collectors.toList())); + } + @Test void testBulkInsertWithSortByRecordKey() { TableEnvironment tableEnv = batchTableEnv; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 92d44a8e7ef..ad07f95c3b6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -400,6 +400,9 @@ public class TestConfigurations { return TestConfigurations.getCreateHoodieTableDDL(this.tableName, this.fields, options, this.withPartition, this.pkField, this.partitionField); } + public Map<String, String> getOptions() { + return this.options; + } } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java index 9d643211c92..3bb0c2d585a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java @@ -120,7 +120,7 @@ public abstract class BaseDatasetBulkInsertCommitActionExecutor implements Seria if (writeConfig.getIndexType() == HoodieIndex.IndexType.BUCKET) { if (writeConfig.getBucketIndexEngineType() == HoodieIndex.BucketIndexEngineType.SIMPLE) { return new BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(), - writeConfig.getBucketIndexNumBuckets()); + writeConfig.getBucketIndexNumBuckets(), table); } else { return new ConsistentBucketIndexBulkInsertPartitionerWithRows(table, Collections.emptyMap(), true); }
