This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch HUDI-8990 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit aa9ed6ce12c49d9c79034453a9bba3bfaf16a9d2 Author: YueZhang <[email protected]> AuthorDate: Fri Mar 21 16:03:50 2025 +0800 finish Spark related conding and test --- .../hudi/index/bucket/HoodieSimpleBucketIndex.java | 10 +- .../index/bucket/PartitionBucketIndexUtils.java | 29 ++- .../RDDSimpleBucketBulkInsertPartitioner.java | 10 +- .../hudi/HoodieDatasetBulkInsertHelper.scala | 15 +- .../hudi/common/model/WriteOperationType.java | 2 + .../apache/hudi/table/ITTestHoodieDataSource.java | 16 +- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 19 +- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 6 +- .../hudi/command/procedures/HoodieProcedures.scala | 1 + .../procedures/PartitionBucketIndexManager.scala | 153 ++++++++------ .../spark/sql/hudi/dml/TestInsertTable.scala | 88 -------- .../TestInsertTableWithPartitionBucketIndex.scala | 223 +++++++++++++++++++++ .../TestPartitionBucketIndexManager.scala | 2 +- 13 files changed, 400 insertions(+), 174 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java index e19e80b82a4..e3bae439956 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndexUtils; @@ -156,14 +157,21 @@ public class HoodieSimpleBucketIndex extends HoodieBucketIndex { private class SimpleBucketIndexLocationFunction implements Function<HoodieRecord, Option<HoodieRecordLocation>> { private final Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping; + private final boolean isPartitionBucketIndexEnable; + private PartitionBucketIndexCalculator calc; public SimpleBucketIndexLocationFunction(HoodieTable table, String partitionPath) { this.bucketIdToFileIdMapping = loadBucketIdToFileIdMappingForPartition(table, partitionPath); + String hashingInstantToLoad = table.getConfig().getHashingConfigInstantToLoad(); + this.isPartitionBucketIndexEnable = StringUtils.nonEmpty(hashingInstantToLoad); + if (isPartitionBucketIndexEnable) { + calc = PartitionBucketIndexCalculator.getInstance(hashingInstantToLoad, table.getMetaClient()); + } } @Override public Option<HoodieRecordLocation> apply(HoodieRecord record) { - int bucketId = getBucketID(record.getKey()); + int bucketId = isPartitionBucketIndexEnable ? getBucketID(record.getKey(), calc.computeNumBuckets(record.getPartitionPath())) : getBucketID(record.getKey()); return Option.ofNullable(bucketIdToFileIdMapping.get(bucketId)); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java index ad1e1d0b3bf..1fbd16c28da 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/PartitionBucketIndexUtils.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -30,6 +31,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieInstantWriter; import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; @@ -44,6 +46,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; public class PartitionBucketIndexUtils { public static final String INITIAL_HASHING_CONFIG_INSTANT = HoodieTimeline.INIT_INSTANT_TS; @@ -51,8 +54,12 @@ public class PartitionBucketIndexUtils { private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexUtils.class); public static boolean isPartitionSimpleBucketIndex(Configuration conf, String basePath) { + return isPartitionSimpleBucketIndex(HadoopFSUtils.getStorageConf(conf), basePath); + } + + public static boolean isPartitionSimpleBucketIndex(StorageConfiguration conf, String basePath) { StoragePath storagePath = getHashingConfigStorageFolder(basePath); - try (HoodieHadoopStorage storage = new HoodieHadoopStorage(storagePath, HadoopFSUtils.getStorageConf(conf))) { + try (HoodieHadoopStorage storage = new HoodieHadoopStorage(storagePath, conf)) { return storage.exists(storagePath); } catch (IOException e) { throw new HoodieIOException("Failed to list PARTITION_BUCKET_INDEX_HASHING_FOLDER folder ", e); @@ -164,4 +171,24 @@ public class PartitionBucketIndexUtils { } return calc.getPartitionToBucket(); } + + /** + * Used for test + * @return all File id in current table using `partitionPath + FileId` format + */ + public static List<String> getAllFileIDWithPartition(HoodieTableMetaClient metaClient) throws IOException { + List<StoragePathInfo> allFiles = metaClient.getStorage().listDirectEntries(metaClient.getBasePath()).stream().flatMap(path -> { + try { + return metaClient.getStorage().listDirectEntries(path.getPath()).stream(); + } catch (IOException e) { + return Stream.empty(); + } + }).collect(Collectors.toList()); + + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), allFiles); + return fsView.getAllFileGroups().map(group -> { + return group.getPartitionPath() + group.getFileGroupId().getFileId().split("-")[0]; + }).collect(Collectors.toList()); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java index 393f0054398..638abe95ce9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java @@ -66,7 +66,7 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends HoodieRecordPayload> return doPartition(records, new Partitioner() { @Override public int numPartitions() { - return index.getNumBuckets() * partitionMapper.size(); + return computeNumPartitions(); } @Override @@ -77,6 +77,14 @@ public class RDDSimpleBucketBulkInsertPartitioner<T extends HoodieRecordPayload> String fileID = partitionMapper.get(partitionPath).get(bucketID); return fileIdPrefixToBucketIndex.get(fileID); } + + private int computeNumPartitions () { + if (isPartitionBucketIndexEnable) { + return partitionMapper.values().stream().mapToInt(Map::size).sum(); + } else { + return index.getNumBuckets() * partitionMapper.size(); + } + } }); } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index f4bb5b81e85..ab5cd537cfc 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -33,7 +33,7 @@ import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, BuiltinKeyGenerator, KeyGenUtils} -import org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper} +import org.apache.hudi.table.action.commit.{BucketBulkInsertDataInternalWriterHelper, BulkInsertDataInternalWriterHelper, ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper} import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable} import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked import org.apache.spark.TaskContext @@ -179,6 +179,19 @@ object HoodieDatasetBulkInsertHelper writeConfig.populateMetaFields, arePartitionRecordsSorted, shouldPreserveHoodieMetadata) + case HoodieIndex.IndexType.BUCKET if writeConfig.getBucketIndexEngineType + == BucketIndexEngineType.SIMPLE => + new BucketBulkInsertDataInternalWriterHelper( + table, + writeConfig, + instantTime, + taskPartitionId, + taskId, + taskEpochId, + schema, + writeConfig.populateMetaFields, + arePartitionRecordsSorted, + shouldPreserveHoodieMetadata) case _ => new BulkInsertDataInternalWriterHelper( table, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index bc82a04023c..033edc822c0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -101,6 +101,8 @@ public enum WriteOperationType { return INDEX; case "alter_schema": return ALTER_SCHEMA; + case "bucket_rescale": + return BUCKET_RESCALE; case "unknown": return UNKNOWN; default: diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 17ff21c3938..39fabe0cbde 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.bucket.PartitionBucketIndexUtils; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; @@ -1307,20 +1308,7 @@ public class ITTestHoodieDataSource { + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, new org.apache.hadoop.conf.Configuration()); - - List<StoragePathInfo> allFiles = metaClient.getStorage().listDirectEntries(new StoragePath(basePath)).stream().flatMap(path -> { - try { - return metaClient.getStorage().listDirectEntries(path.getPath()).stream(); - } catch (IOException e) { - return Stream.empty(); - } - }).collect(Collectors.toList()); - - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), allFiles); - List<String> actual = fsView.getAllFileGroups().map(group -> { - return group.getPartitionPath() + group.getFileGroupId().getFileId().split("-")[0]; - }).collect(Collectors.toList()); + List<String> actual = PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient); // based on expression partition=(par1|par2),2 and default bucket number 1 // par1 and par2 have two buckets. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7c59e433c8a..b6a72a73e7e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -42,7 +42,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys -import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION} import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} @@ -68,6 +68,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.shims.ShimLoader +import org.apache.hudi.index.HoodieIndex +import org.apache.hudi.index.bucket.PartitionBucketIndexUtils import org.apache.spark.{SPARK_VERSION, SparkContext} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql._ @@ -247,7 +249,7 @@ class HoodieSparkSqlWriterInternal { asyncCompactionTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get asyncClusteringTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get // re-use table configs and inject defaults. - val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode, streamingWritesParamsOpt.isDefined) + var (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode, streamingWritesParamsOpt.isDefined) val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "") val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim @@ -331,6 +333,19 @@ class HoodieSparkSqlWriterInternal { .initTable(HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration), path) } + // take care of partition level bucket index which is simple bucket index + if (parameters.contains(HoodieIndexConfig.INDEX_TYPE.key) + && parameters(HoodieIndexConfig.INDEX_TYPE.key) == HoodieIndex.IndexType.BUCKET.name + && (!parameters.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key) + || (parameters.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key) + && parameters(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key) == HoodieIndex.IndexType.SIMPLE.name)) + && !parameters.contains(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key) + && PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(tableMetaClient.getStorageConf, basePath.toString)) { + val hashingConfigToLoad: String = PartitionBucketIndexUtils.getHashingConfigInstantToLoad(tableMetaClient) + hoodieConfig.setValue(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key, hashingConfigToLoad) + parameters = parameters ++ Map(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key -> hashingConfigToLoad) + } + var instantTime: String = null tableConfig = tableMetaClient.getTableConfig diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 6cf5053bdce..3908e51a5db 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -30,6 +30,8 @@ import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig, HoodieWr import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, MultiPartKeysValueExtractor} import org.apache.hudi.hive.ddl.HiveSyncMode +import org.apache.hudi.index.HoodieIndex +import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator} import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.HoodieSyncConfig @@ -91,9 +93,9 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) val defaultOpts = Map[String, String]( - OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL, - HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.BUCKET_RESCALE.value(), KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + HoodieIndexConfig.INDEX_TYPE.key() -> HoodieIndex.IndexType.BUCKET.name(), + HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key() -> BucketIndexEngineType.SIMPLE.name(), SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 0a6afd4e00a..7bea2d114a4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -96,6 +96,7 @@ object HoodieProcedures { ,(RunTTLProcedure.NAME, RunTTLProcedure.builder) ,(DropPartitionProcedure.NAME, DropPartitionProcedure.builder) ,(TruncateTableProcedure.NAME, TruncateTableProcedure.builder) + ,(PartitionBucketIndexManager.NAME, PartitionBucketIndexManager.builder) ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala index 4aaeec80a1a..d949bbcf9a0 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala @@ -18,44 +18,38 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.avro.Schema -import org.apache.hadoop.conf.Configuration +import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, ENABLE_ROW_WRITER, OPERATION} import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieSparkSqlWriter} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.SparkRDDWriteClient -import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.commit.DatasetBucketRescaleCommitActionExecutor import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig, SerializableSchema} import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{FileSlice, PartitionBucketIndexHashingConfig, WriteOperationType} +import org.apache.hudi.common.model.{PartitionBucketIndexHashingConfig, WriteOperationType} import org.apache.hudi.common.table.read.HoodieFileGroupReader import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView -import org.apache.hudi.common.util.{Option, StringUtils} -import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.common.util.{Option, ValidationUtils} +import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.index.bucket.{PartitionBucketIndexCalculator, PartitionBucketIndexUtils} import org.apache.hudi.internal.schema.InternalSchema -import org.apache.hudi.internal.schema.utils.SerDeHelper -import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration -import org.apache.hudi.storage.{StoragePath, StoragePathInfo} +import org.apache.hudi.storage.StoragePath import org.apache.hudi.table.SparkBroadcastManager import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} import java.util import java.util.function.Supplier -import java.util.stream.Collectors import scala.collection.JavaConverters._ import scala.collection.mutable -class PartitionBucketIndexManager() extends BaseProcedure +class PartitionBucketIndexManager extends BaseProcedure with ProcedureBuilder with PredicateHelper with ProvidesHoodieConfig @@ -64,12 +58,14 @@ class PartitionBucketIndexManager() extends BaseProcedure private val PARAMETERS = Array[ProcedureParameter]( ProcedureParameter.required(0, "table", DataTypes.StringType), ProcedureParameter.optional(1, "overwrite", DataTypes.StringType), - ProcedureParameter.optional(2, "bucket-number", DataTypes.IntegerType, -1), + ProcedureParameter.optional(2, "bucketNumber", DataTypes.IntegerType, -1), ProcedureParameter.optional(3, "add", DataTypes.StringType), ProcedureParameter.optional(4, "dry-run", DataTypes.BooleanType, true), ProcedureParameter.optional(5, "rollback", DataTypes.StringType), ProcedureParameter.optional(6, "show-config", DataTypes.BooleanType, false), - ProcedureParameter.optional(7, "rule", DataTypes.StringType, "regex") + ProcedureParameter.optional(7, "rule", DataTypes.StringType, "regex"), + // params => key=value, key2=value2 + ProcedureParameter.optional(8, "options", DataTypes.StringType) ) private val OUTPUT_TYPE = new StructType(Array[StructField]( @@ -93,6 +89,7 @@ class PartitionBucketIndexManager() extends BaseProcedure val rollback = getArgValueOrDefault(args, PARAMETERS(5)).orNull.asInstanceOf[String] val showConfig = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[Boolean] val rule = getArgValueOrDefault(args, PARAMETERS(7)).orNull.asInstanceOf[String] + val options = getArgValueOrDefault(args, PARAMETERS(8)) try { // Get table metadata @@ -102,6 +99,18 @@ class PartitionBucketIndexManager() extends BaseProcedure val writeClient = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty, scala.Option.apply(table)) val context = writeClient.getEngineContext + var config = buildBucketRescaleHoodieConfig(hoodieCatalogTable) + + options match { + case Some(p) => + config = config ++ HoodieCLIUtils.extractOptions(p.asInstanceOf[String]) + case _ => + logInfo("No options") + } + + config = config ++ Map(OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL, + HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.BUCKET_RESCALE.value(), + ENABLE_ROW_WRITER.key() -> "true") // Determine which operation to perform if (showConfig) { @@ -109,21 +118,21 @@ class PartitionBucketIndexManager() extends BaseProcedure } else if (rollback != null) { handleRollback(metaClient, rollback) } else if (overwrite != null) { - handleOverwrite(hoodieCatalogTable, writeClient, context, metaClient, overwrite, bucketNumber, rule, dryRun) + handleOverwrite(config, writeClient, context, metaClient, overwrite, bucketNumber, rule, dryRun) } else if (add != null) { handleAdd(metaClient, add, dryRun) } else { Seq(Row("ERROR", "INVALID_OPERATION", "No valid operation specified")) } } catch { - case e: Exception => Seq(Row("ERROR", "EXCEPTION", e.getMessage)) + case e: Exception => throw new HoodieException(e.getMessage, e) } } /** * Handle the overwrite operation. */ - private def handleOverwrite(catalog: HoodieCatalogTable, + private def handleOverwrite(config: Map[String, String], writeClient: SparkRDDWriteClient[_], context : HoodieEngineContext, metaClient: HoodieTableMetaClient, @@ -131,30 +140,42 @@ class PartitionBucketIndexManager() extends BaseProcedure bucketNumber: Int, rule: String, dryRun: Boolean): Seq[Row] = { - val config = buildBucketRescaleHoodieConfig(catalog).asJava val basePath = metaClient.getBasePath val mdtEnable = metaClient.getStorage().exists(new StoragePath(metaClient.getBasePath, HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH)) // get all partition paths val allPartitions = FSUtils.getAllPartitionPaths(context, metaClient.getStorage, metaClient.getBasePath, mdtEnable) + val usePartitionBucketIndexBefore = PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(context.getStorageConf, basePath.toString) - // get Map<partitionPath, bucketNumber> based on latest hashing_config - val latestHashingConfigInstant = PartitionBucketIndexUtils.getHashingConfigInstantToLoad(metaClient) - val calcWithLatestInstant = PartitionBucketIndexCalculator.getInstance(latestHashingConfigInstant, metaClient) - val partition2BucketWithLatestHashingConfig = PartitionBucketIndexUtils.getAllBucketNumbers(calcWithLatestInstant, allPartitions) + var partition2BucketWithLatestHashingConfig: util.Map[String, Integer] = null + var calcWithLatestInstant: PartitionBucketIndexCalculator = null + if (usePartitionBucketIndexBefore) { + // get Map<partitionPath, bucketNumber> based on latest hashing_config + val latestHashingConfigInstant = PartitionBucketIndexUtils.getHashingConfigInstantToLoad(metaClient) + calcWithLatestInstant = PartitionBucketIndexCalculator.getInstance(latestHashingConfigInstant, metaClient) + partition2BucketWithLatestHashingConfig = PartitionBucketIndexUtils.getAllBucketNumbers(calcWithLatestInstant, allPartitions) + } else { + ValidationUtils.checkArgument(bucketNumber != -1) + partition2BucketWithLatestHashingConfig = allPartitions.asScala.map(partition => (partition, new Integer(bucketNumber))).toMap.asJava + } // get Map<partitionPath, bucketNumber> based on new given expression val defaultBucketNumber = if (bucketNumber != -1) { bucketNumber - } else { + } else if (calcWithLatestInstant != null ) { // reuse latest default bucket number calcWithLatestInstant.getHashingConfig.getDefaultBucketNumber + } else { + throw new HoodieException("Please set original bucket number before upgrade to partition bucket level bucket index") } + val instantTime = writeClient.createNewInstantTime() - config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key(), expression) - config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key(), rule) - config.put(HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key(), instantTime) - config.put(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), bucketNumber.toString) + + val finalConfig = config ++ Map(HoodieIndexConfig.BUCKET_INDEX_PARTITION_EXPRESSIONS.key()-> expression, + HoodieIndexConfig.BUCKET_INDEX_PARTITION_RULE_TYPE.key() -> rule, + HoodieIndexConfig.BUCKET_INDEX_PARTITION_LOAD_INSTANT.key() -> instantTime, + HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key() -> bucketNumber.toString + ) val newConfig = new PartitionBucketIndexHashingConfig(expression, defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, instantTime) val calcWithNewExpression = PartitionBucketIndexCalculator.getInstance(instantTime, newConfig) @@ -166,7 +187,9 @@ class PartitionBucketIndexManager() extends BaseProcedure // get all fileSlices need to read val allFilesMap = FSUtils.getFilesInPartitions(context, metaClient.getStorage(), HoodieMetadataConfig.newBuilder.enable(mdtEnable).build, - metaClient.getBasePath.toString, partitionsToRescale.toArray) + metaClient.getBasePath.toString, partitionsToRescale.map(relative => { + new StoragePath(basePath, relative) + }).map(storagePath => storagePath.toString).toArray) val files = allFilesMap.values().asScala.flatMap(x => x.asScala).toList val view = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline, files.asJava) val allFileSlice = partitionsToRescale.flatMap(partitionPath => { @@ -185,42 +208,46 @@ class PartitionBucketIndexManager() extends BaseProcedure val broadcastManager = new SparkBroadcastManager(context, metaClient) broadcastManager.prepareAndBroadcast() val sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields) - val serializableTableSchemaWithMetaFields = new SerializableSchema(tableSchemaWithMetaFields) - val latestInstantTime = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get() - - val res: RDD[InternalRow] = spark.sparkContext.parallelize(allFileSlice, allFileSlice.size).flatMap(fileSlice => { - // instantiate other supporting cast - val readerSchema = serializableTableSchemaWithMetaFields.get - val readerContextOpt = broadcastManager.retrieveFileGroupReaderContext(basePath) - val internalSchemaOption: Option[InternalSchema] = Option.empty() - // instantiate FG reader - val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(), - metaClient.getStorage, - basePath.toString, - latestInstantTime.requestedTime(), - fileSlice, - readerSchema, - readerSchema, - internalSchemaOption, // not support evolution of schema for now - metaClient, - metaClient.getTableConfig.getProps, - 0, - java.lang.Long.MAX_VALUE, - HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue()) - fileGroupReader.initRecordIterators() - val iterator = fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]] - iterator.asScala - }) - val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, res, sparkSchemaWithMetaFields) + val res: RDD[InternalRow] = if (allFileSlice.isEmpty) { + spark.sparkContext.emptyRDD + } else { + val serializableTableSchemaWithMetaFields = new SerializableSchema(tableSchemaWithMetaFields) + val latestInstantTime = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get() + + spark.sparkContext.parallelize(allFileSlice, allFileSlice.size).flatMap(fileSlice => { + // instantiate other supporting cast + val readerSchema = serializableTableSchemaWithMetaFields.get + val readerContextOpt = broadcastManager.retrieveFileGroupReaderContext(basePath) + val internalSchemaOption: Option[InternalSchema] = Option.empty() + // instantiate FG reader + val fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(), + metaClient.getStorage, + basePath.toString, + latestInstantTime.requestedTime(), + fileSlice, + readerSchema, + readerSchema, + internalSchemaOption, // not support evolution of schema for now + metaClient, + metaClient.getTableConfig.getProps, + 0, + java.lang.Long.MAX_VALUE, + HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.defaultValue()) + fileGroupReader.initRecordIterators() + val iterator = fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]] + iterator.asScala + }) + } + val dataFrame = HoodieUnsafeUtils.createDataFrameFromRDD(sparkSession, res, sparkSchemaWithMetaFields) val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write( sparkSession.sqlContext, - SaveMode.Overwrite, - config.asScala.toMap, + SaveMode.Append, + finalConfig, dataFrame) - val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry Run: $dryRun" - Seq(Row("SUCCESS", "OVERWRITE", success)) + val details = s"Expression: $expression, Bucket Number: $bucketNumber, Dry Run: $dryRun, Instant: $instantTime" + Seq(Row("SUCCESS", "OVERWRITE", details)) } /** @@ -277,13 +304,13 @@ class PartitionBucketIndexManager() extends BaseProcedure Seq(Row("SUCCESS", "SHOW_CONFIG", null)) } - override def build: Procedure = new PartitionBucketIndexManager() + override def build: Procedure = new PartitionBucketIndexManager } object PartitionBucketIndexManager { - val NAME = "PartitionBucketIndexManager" + val NAME = "partition_bucket_index_manager" - def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { - override def get() = new PartitionBucketIndexManager() + def builder : Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new PartitionBucketIndexManager } } \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 532a000f0f6..7c1bc0aa2a2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -1847,94 +1847,6 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } - test("Test Bulk Insert Into Partition Bucket Index Table") { - withSQLConf( - "hoodie.datasource.write.operation" -> "bulk_insert", - "hoodie.bulkinsert.shuffle.parallelism" -> "1") { - Seq("mor", "cow").foreach { tableType => - Seq("true", "false").foreach { bulkInsertAsRow => - withTempDir { tmp => - val tableName = generateTableName - // Create a partitioned table - spark.sql( - s""" - |create table $tableName ( - | id int, - | dt string, - | name string, - | price double, - | ts long - |) using hudi - | tblproperties ( - | primaryKey = 'id,name', - | type = '$tableType', - | preCombineField = 'ts', - | hoodie.index.type = 'BUCKET', - | hoodie.bucket.index.hash.field = 'id,name', - | hoodie.datasource.write.row.writer.enable = '$bulkInsertAsRow') - | partitioned by (dt) - | location '${tmp.getCanonicalPath}' - """.stripMargin) - - // Note: Do not write the field alias, the partition field must be placed last. - spark.sql( - s""" - | insert into $tableName values - | (1, 'a1,1', 10, 1000, "2021-01-05"), - | (2, 'a2', 20, 2000, "2021-01-06"), - | (3, 'a3,3', 30, 3000, "2021-01-07") - """.stripMargin) - - checkAnswer(s"select id, name, price, ts, dt from $tableName")( - Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), - Seq(2, "a2", 20.0, 2000, "2021-01-06"), - Seq(3, "a3,3", 30.0, 3000, "2021-01-07") - ) - - // for COW with disabled Spark native row writer, multiple bulk inserts are restricted - if (tableType != "cow" && bulkInsertAsRow != "false") { - spark.sql( - s""" - | insert into $tableName values - | (1, 'a1', 10, 1000, "2021-01-05"), - | (3, "a3", 30, 3000, "2021-01-07") - """.stripMargin) - - checkAnswer(s"select id, name, price, ts, dt from $tableName")( - Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), - Seq(1, "a1", 10.0, 1000, "2021-01-05"), - Seq(2, "a2", 20.0, 2000, "2021-01-06"), - Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), - Seq(3, "a3", 30.0, 3000, "2021-01-07") - ) - - // there are two files in partition(dt = '2021-01-05') - checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")( - Seq(2) - ) - - // would generate 6 other files in partition(dt = '2021-01-05') - spark.sql( - s""" - | insert into $tableName values - | (4, 'a1,1', 10, 1000, "2021-01-05"), - | (5, 'a1,1', 10, 1000, "2021-01-05"), - | (6, 'a1,1', 10, 1000, "2021-01-05"), - | (7, 'a1,1', 10, 1000, "2021-01-05"), - | (8, 'a1,1', 10, 1000, "2021-01-05"), - | (10, 'a3,3', 30, 3000, "2021-01-05") - """.stripMargin) - - checkAnswer(s"select count(distinct _hoodie_file_name) from $tableName where dt = '2021-01-05'")( - Seq(8) - ) - } - } - } - } - } - } - test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET and disabled Spark native row writer") { withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert", "hoodie.bulkinsert.shuffle.parallelism" -> "1") { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala new file mode 100644 index 00000000000..d437f4c9679 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTableWithPartitionBucketIndex.scala @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.dml + +import org.apache.hudi.index.bucket.PartitionBucketIndexUtils +import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase +import scala.collection.JavaConverters._ + +class TestInsertTableWithPartitionBucketIndex extends HoodieSparkSqlTestBase { + + /** + * Write operation including : + * Case1: Mor + bulk_insert + asRow enable + simple bucket index + * Case2: Mor + bulk_insert + asRow disable + simple bucket index + * Case3: Cow + bulk_insert + asRow enable + simple bucket index + * Case4: Cow + bulk_insert + asRow disable + simple bucket index + * Case5: Mor + bulk_insert + asRow enable + partition level bucket index + * Case6: Mor + bulk_insert + asRow disable + partition level bucket index + * Case7: Cow + bulk_insert + asRow enable + partition level bucket index + * Case8: Cow + bulk_insert + asRow disable + partition level bucket index + * + * Total WorkFlow are as followed: + * Step1: Create a new table using simple bucket index + * Step2: Upgrade table to partition level bucket index using `call partition_bucket_index_manager` + * Step3: Trigger another write operation through partition level bucket index + * + */ + test("Test Bulk Insert Into Partition Bucket Index Table") { + withSQLConf( + "hoodie.datasource.write.operation" -> "bulk_insert", + "hoodie.bulkinsert.shuffle.parallelism" -> "2") { + Seq("mor", "cow").foreach { tableType => + Seq("true", "false").foreach { bulkInsertAsRow => + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | type = '$tableType', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.hash.field = 'id,name', + | hoodie.bucket.index.num.buckets = 1, + | hoodie.datasource.write.row.writer.enable = '$bulkInsertAsRow') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05"), + | (11, 'a1,1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (22, 'a2', 20, 2000, "2021-01-06") + """.stripMargin) + + // upgrade to partition level bucket index and rescale dt=2021-01-05 from 1 to 2 + val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2" + val rule = "regex" + val defaultBucketNumber = 1 + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber)") + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(11, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(22, "a2", 20.0, 2000, "2021-01-06") + ) + + // insert into new partition 2021-01-07 with partition level bucket index + spark.sql( + s""" + | insert into $tableName values + | (3, 'a3,3', 30, 3000, "2021-01-07"), + | (33, 'a3,3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName") ( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(11, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(22, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), + Seq(33, "a3,3", 30.0, 3000, "2021-01-07") + ) + val metaClient = createMetaClient(spark, tmp.getCanonicalPath) + val actual: List[String] = PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient).asScala.toList + val expected: List[String] = List("dt=2021-01-05" + "00000000", + "dt=2021-01-05" + "00000000", + "dt=2021-01-05" + "00000001", + "dt=2021-01-06" + "00000000", + "dt=2021-01-07" + "00000000", + "dt=2021-01-07" + "00000001") + assert(actual.sorted == expected.sorted) + } + } + } + } + } + + /** + * Write operation including : + * Case1: Mor + upsert + simple bucket index + * Case2: Cow + upsert + simple bucket index + * Case3: Mor + upsert + partition level bucket index + * Case4: Cow + upsert + partition level bucket index + * + * Total WorkFlow are as followed: + * Step1: Create a new table using simple bucket index + * Step2: Upgrade table to partition level bucket index using `call partition_bucket_index_manager` + * Step3: Trigger another write operation through partition level bucket index + * + */ + test("Test Upsert Into Partition Bucket Index Table") { + withSQLConf( + "hoodie.datasource.write.operation" -> "upsert") { + Seq("cow", "mor").foreach { tableType => + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | type = '$tableType', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.hash.field = 'id,name', + | hoodie.bucket.index.num.buckets = 1) + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + | """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05"), + | (11, 'a1,1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (22, 'a2', 20, 2000, "2021-01-06") + | """.stripMargin) + + // upgrade to partition level bucket index and rescale dt=2021-01-05 from 1 to 2 + val expressions = "dt=(2021\\-01\\-05|2021\\-01\\-07),2" + val rule = "regex" + val defaultBucketNumber = 1 + spark.sql(s"call partition_bucket_index_manager(table => '$tableName', overwrite => '$expressions', rule => '$rule', bucketNumber => $defaultBucketNumber)") + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(11, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(22, "a2", 20.0, 2000, "2021-01-06") + ) + + // insert into new partition 2021-01-07 with partition level bucket index + // update (1, 'a1,1', 10, 1000, "2021-01-05") to (1, 'a1,1', 100, 1000, "2021-01-05") + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 100, 1000, "2021-01-05"), + | (3, 'a3,3', 30, 3000, "2021-01-07"), + | (33, 'a3,3', 30, 3000, "2021-01-07") + | """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 100.0, 1000, "2021-01-05"), + Seq(11, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(22, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), + Seq(33, "a3,3", 30.0, 3000, "2021-01-07") + ) + val metaClient = createMetaClient(spark, tmp.getCanonicalPath) + val actual: List[String] = PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient).asScala.toList + val expected: List[String] = List("dt=2021-01-05" + "00000000", + "dt=2021-01-05" + "00000000", + "dt=2021-01-05" + "00000001", + "dt=2021-01-06" + "00000000", + "dt=2021-01-07" + "00000000", + "dt=2021-01-07" + "00000001") + // compare file group as expected + assert(actual.sorted == expected.sorted) + } + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala index 93d2a557afe..0bf5aaaa3a4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestPartitionBucketIndexManager.scala @@ -16,7 +16,7 @@ class TestPartitionBucketIndexManager extends HoodieSparkProcedureTestBase { val defaultBucketNumber = 10 PartitionBucketIndexUtils.initHashingConfig(metaClient, expressions, rule, defaultBucketNumber, null) - spark.sql(s"""call PartitionBucketIndexManager(table => '$tableName', show-config => 'true')""").collect() + spark.sql(s"call PartitionBucketIndexManager(table => '$tableName', show-config => 'true')").collect() } }
