This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 51d03515728 [HUDI-5857] Insert overwrite into bucket table would generate new file group id (#8072) 51d03515728 is described below commit 51d035157289153cfc6915ae2beb0668f641db5e Author: Jing Zhang <beyond1...@gmail.com> AuthorDate: Fri Mar 10 16:46:24 2023 +0800 [HUDI-5857] Insert overwrite into bucket table would generate new file group id (#8072) --- .../action/commit/SparkBucketIndexPartitioner.java | 11 ++ .../SparkInsertOverwriteCommitActionExecutor.java | 14 +++ .../commit/SparkInsertOverwritePartitioner.java | 15 +++ .../TestMORDataSourceWithBucketIndex.scala | 29 ++++++ .../apache/spark/sql/hudi/TestInsertTable.scala | 114 ++++++++++++++++++++- 5 files changed, 181 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java index 23182587597..a246a7150c9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.index.bucket.BucketIdentifier; import scala.Tuple2; @@ -42,6 +43,9 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; +import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE; +import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE_TABLE; + /** * Packs incoming records to be inserted into buckets (1 bucket = 1 RDD partition). */ @@ -57,6 +61,7 @@ public class SparkBucketIndexPartitioner<T> extends * The partition offset is a multiple of the bucket num. */ private final Map<String, Integer> partitionPathOffset; + private final boolean isOverwrite; /** * Partition path and file groups in it pair. Decide the file group an incoming update should go to. @@ -84,6 +89,8 @@ public class SparkBucketIndexPartitioner<T> extends i += numBuckets; } assignUpdates(profile); + WriteOperationType operationType = profile.getOperationType(); + this.isOverwrite = INSERT_OVERWRITE.equals(operationType) || INSERT_OVERWRITE_TABLE.equals(operationType); } private void assignUpdates(WorkloadProfile profile) { @@ -106,6 +113,10 @@ public class SparkBucketIndexPartitioner<T> extends public BucketInfo getBucketInfo(int bucketNumber) { String partitionPath = partitionPaths.get(bucketNumber / numBuckets); String bucketId = BucketIdentifier.bucketIdStr(bucketNumber % numBuckets); + // Insert overwrite always generates new bucket file id + if (isOverwrite) { + return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucketId), partitionPath); + } Option<String> fileIdOption = Option.fromJavaOptional(updatePartitionPathFileIds .getOrDefault(partitionPath, Collections.emptySet()).stream() .filter(e -> e.startsWith(bucketId)) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java index ee3b31cc577..b265b32da8e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java @@ -34,6 +34,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.Partitioner; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -85,4 +86,17 @@ public class SparkInsertOverwriteCommitActionExecutor<T> // because new commit is not complete. it is safe to mark all existing file Ids as old files return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList()); } + + @Override + protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) { + SparkHoodiePartitioner upsertPartitioner = (SparkHoodiePartitioner) partitioner; + BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); + BucketType btype = binfo.bucketType; + switch (btype) { + case INSERT: + return handleInsert(binfo.fileIdPrefix, recordItr); + default: + throw new AssertionError("Expect INSERT bucketType for insert overwrite, please correct the logical of " + partitioner.getClass().getName()); + } + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java index 57668aada1c..5d82253da86 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; @@ -41,6 +42,20 @@ public class SparkInsertOverwritePartitioner extends UpsertPartitioner { super(profile, context, table, config); } + @Override + public BucketInfo getBucketInfo(int bucketNumber) { + BucketInfo bucketInfo = super.getBucketInfo(bucketNumber); + switch (bucketInfo.bucketType) { + case INSERT: + return bucketInfo; + case UPDATE: + // Insert overwrite always generates new bucket file id + return new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), bucketInfo.partitionPath); + default: + throw new AssertionError(); + } + } + /** * Returns a list of small files in the given partition path. */ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala index 41913074d54..8fbd00022b2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala @@ -148,4 +148,33 @@ class TestMORDataSourceWithBucketIndex extends HoodieSparkClientTestBase { assertEquals(100, hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), "inner").count()) } + + @Test def testInsertOverwrite(): Unit = { + val partitionPaths = new Array[String](1) + partitionPaths.update(0, "2020/01/10") + val newDataGen = new HoodieTestDataGenerator(partitionPaths) + val records1 = recordsToStrings(newDataGen.generateInserts("001", 100)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val records2 = recordsToStrings(newDataGen.generateInserts("002", 20)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(20, hudiSnapshotDF1.count()) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index b33deebdf72..8e54e908f7a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hudi -import org.apache.hudi.DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} @@ -30,7 +29,6 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.spark.sql.SaveMode import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata -import org.scalatest.Inspectors.forAll import java.io.File @@ -1125,4 +1123,116 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } + + test("Test Insert Overwrite Into Bucket Index Table") { + withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") { + Seq("mor", "cow").foreach { tableType => + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + |tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.num.buckets = '4' + |) + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + + spark.sql( + s"""insert into $tableName values + |(5, 'a', 35, 1000, '2021-01-05'), + |(1, 'a', 31, 1000, '2021-01-05'), + |(3, 'a', 33, 1000, '2021-01-05'), + |(4, 'b', 16, 1000, '2021-01-05'), + |(2, 'b', 18, 1000, '2021-01-05'), + |(6, 'b', 17, 1000, '2021-01-05'), + |(8, 'a', 21, 1000, '2021-01-05'), + |(9, 'a', 22, 1000, '2021-01-05'), + |(7, 'a', 23, 1000, '2021-01-05') + |""".stripMargin) + + // Insert overwrite static partition + spark.sql( + s""" + | insert overwrite table $tableName partition(dt = '2021-01-05') + | select * from (select 13 , 'a2', 12, 1000) limit 10 + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")( + Seq(13, "a2", 12.0, 1000, "2021-01-05") + ) + }) + } + } + } + + test("Test Insert Overwrite Into Consistent Bucket Index Table") { + withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") { + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + |tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.index.bucket.engine = "CONSISTENT_HASHING", + | hoodie.bucket.index.num.buckets = '4' + |) + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + + spark.sql( + s"""insert into $tableName values + |(5, 'a', 35, 1000, '2021-01-05'), + |(1, 'a', 31, 1000, '2021-01-05'), + |(3, 'a', 33, 1000, '2021-01-05'), + |(4, 'b', 16, 1000, '2021-01-05'), + |(2, 'b', 18, 1000, '2021-01-05'), + |(6, 'b', 17, 1000, '2021-01-05'), + |(8, 'a', 21, 1000, '2021-01-05'), + |(9, 'a', 22, 1000, '2021-01-05'), + |(7, 'a', 23, 1000, '2021-01-05') + |""".stripMargin) + + // Insert overwrite static partition + spark.sql( + s""" + | insert overwrite table $tableName partition(dt = '2021-01-05') + | select * from (select 13 , 'a2', 12, 1000) limit 10 + """.stripMargin) + + // Double insert overwrite static partition + spark.sql( + s""" + | insert overwrite table $tableName partition(dt = '2021-01-05') + | select * from (select 13 , 'a3', 12, 1000) limit 10 + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")( + Seq(13, "a3", 12.0, 1000, "2021-01-05") + ) + }) + } + } }