This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 51d03515728 [HUDI-5857] Insert overwrite into bucket table would 
generate new file group id (#8072)
51d03515728 is described below

commit 51d035157289153cfc6915ae2beb0668f641db5e
Author: Jing Zhang <beyond1...@gmail.com>
AuthorDate: Fri Mar 10 16:46:24 2023 +0800

    [HUDI-5857] Insert overwrite into bucket table would generate new file 
group id (#8072)
---
 .../action/commit/SparkBucketIndexPartitioner.java |  11 ++
 .../SparkInsertOverwriteCommitActionExecutor.java  |  14 +++
 .../commit/SparkInsertOverwritePartitioner.java    |  15 +++
 .../TestMORDataSourceWithBucketIndex.scala         |  29 ++++++
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 114 ++++++++++++++++++++-
 5 files changed, 181 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
index 23182587597..a246a7150c9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import scala.Tuple2;
 
@@ -42,6 +43,9 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
 
+import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE;
+import static 
org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE_TABLE;
+
 /**
  * Packs incoming records to be inserted into buckets (1 bucket = 1 RDD 
partition).
  */
@@ -57,6 +61,7 @@ public class SparkBucketIndexPartitioner<T> extends
    * The partition offset is a multiple of the bucket num.
    */
   private final Map<String, Integer> partitionPathOffset;
+  private final boolean isOverwrite;
 
   /**
    * Partition path and file groups in it pair. Decide the file group an 
incoming update should go to.
@@ -84,6 +89,8 @@ public class SparkBucketIndexPartitioner<T> extends
       i += numBuckets;
     }
     assignUpdates(profile);
+    WriteOperationType operationType = profile.getOperationType();
+    this.isOverwrite = INSERT_OVERWRITE.equals(operationType) || 
INSERT_OVERWRITE_TABLE.equals(operationType);
   }
 
   private void assignUpdates(WorkloadProfile profile) {
@@ -106,6 +113,10 @@ public class SparkBucketIndexPartitioner<T> extends
   public BucketInfo getBucketInfo(int bucketNumber) {
     String partitionPath = partitionPaths.get(bucketNumber / numBuckets);
     String bucketId = BucketIdentifier.bucketIdStr(bucketNumber % numBuckets);
+    // Insert overwrite always generates new bucket file id
+    if (isOverwrite) {
+      return new BucketInfo(BucketType.INSERT, 
BucketIdentifier.newBucketFileIdPrefix(bucketId), partitionPath);
+    }
     Option<String> fileIdOption = 
Option.fromJavaOptional(updatePartitionPathFileIds
         .getOrDefault(partitionPath, Collections.emptySet()).stream()
         .filter(e -> e.startsWith(bucketId))
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index ee3b31cc577..b265b32da8e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -34,6 +34,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import org.apache.spark.Partitioner;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -85,4 +86,17 @@ public class SparkInsertOverwriteCommitActionExecutor<T>
     // because new commit is not complete. it is safe to mark all existing 
file Ids as old files
     return 
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
   }
+
+  @Override
+  protected Iterator<List<WriteStatus>> handleInsertPartition(String 
instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
+    SparkHoodiePartitioner upsertPartitioner = (SparkHoodiePartitioner) 
partitioner;
+    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
+    BucketType btype = binfo.bucketType;
+    switch (btype) {
+      case INSERT:
+        return handleInsert(binfo.fileIdPrefix, recordItr);
+      default:
+        throw new AssertionError("Expect INSERT bucketType for insert 
overwrite, please correct the logical of " + partitioner.getClass().getName());
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
index 57668aada1c..5d82253da86 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
@@ -41,6 +42,20 @@ public class SparkInsertOverwritePartitioner extends 
UpsertPartitioner {
     super(profile, context, table, config);
   }
 
+  @Override
+  public BucketInfo getBucketInfo(int bucketNumber) {
+    BucketInfo bucketInfo = super.getBucketInfo(bucketNumber);
+    switch (bucketInfo.bucketType) {
+      case INSERT:
+        return bucketInfo;
+      case UPDATE:
+        // Insert overwrite always generates new bucket file id
+        return new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), 
bucketInfo.partitionPath);
+      default:
+        throw new AssertionError();
+    }
+  }
+
   /**
    * Returns a list of small files in the given partition path.
    */
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala
index 41913074d54..8fbd00022b2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala
@@ -148,4 +148,33 @@ class TestMORDataSourceWithBucketIndex extends 
HoodieSparkClientTestBase {
     assertEquals(100,
         hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), 
"inner").count())
   }
+
+  @Test def testInsertOverwrite(): Unit = {
+    val partitionPaths = new Array[String](1)
+    partitionPaths.update(0, "2020/01/10")
+    val newDataGen = new HoodieTestDataGenerator(partitionPaths)
+    val records1 = recordsToStrings(newDataGen.generateInserts("001", 
100)).toList
+    val inputDF1: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "false") // else fails due to 
compaction & deltacommit instant times being same
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+    val records2 = recordsToStrings(newDataGen.generateInserts("002", 
20)).toList
+    val inputDF2: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "false")
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(20, hudiSnapshotDF1.count())
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index b33deebdf72..8e54e908f7a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hudi.DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
@@ -30,7 +29,6 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
-import org.scalatest.Inspectors.forAll
 
 import java.io.File
 
@@ -1125,4 +1123,116 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test Insert Overwrite Into Bucket Index Table") {
+    withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
+      Seq("mor", "cow").foreach { tableType =>
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create a partitioned table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long,
+               |  dt string
+               |) using hudi
+               |tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.index.type = 'BUCKET',
+               |  hoodie.bucket.index.num.buckets = '4'
+               |)
+               | partitioned by (dt)
+               | location '${tmp.getCanonicalPath}/$tableName'
+       """.stripMargin)
+
+          spark.sql(
+            s"""insert into $tableName  values
+               |(5, 'a', 35, 1000, '2021-01-05'),
+               |(1, 'a', 31, 1000, '2021-01-05'),
+               |(3, 'a', 33, 1000, '2021-01-05'),
+               |(4, 'b', 16, 1000, '2021-01-05'),
+               |(2, 'b', 18, 1000, '2021-01-05'),
+               |(6, 'b', 17, 1000, '2021-01-05'),
+               |(8, 'a', 21, 1000, '2021-01-05'),
+               |(9, 'a', 22, 1000, '2021-01-05'),
+               |(7, 'a', 23, 1000, '2021-01-05')
+               |""".stripMargin)
+
+          // Insert overwrite static partition
+          spark.sql(
+            s"""
+               | insert overwrite table $tableName partition(dt = '2021-01-05')
+               | select * from (select 13 , 'a2', 12, 1000) limit 10
+        """.stripMargin)
+          checkAnswer(s"select id, name, price, ts, dt from $tableName order 
by dt")(
+            Seq(13, "a2", 12.0, 1000, "2021-01-05")
+          )
+        })
+      }
+    }
+  }
+
+  test("Test Insert Overwrite Into Consistent Bucket Index Table") {
+    withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
+      withRecordType()(withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  dt string
+             |) using hudi
+             |tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.index.type = 'BUCKET',
+             |  hoodie.index.bucket.engine = "CONSISTENT_HASHING",
+             |  hoodie.bucket.index.num.buckets = '4'
+             |)
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}/$tableName'
+       """.stripMargin)
+
+        spark.sql(
+          s"""insert into $tableName  values
+             |(5, 'a', 35, 1000, '2021-01-05'),
+             |(1, 'a', 31, 1000, '2021-01-05'),
+             |(3, 'a', 33, 1000, '2021-01-05'),
+             |(4, 'b', 16, 1000, '2021-01-05'),
+             |(2, 'b', 18, 1000, '2021-01-05'),
+             |(6, 'b', 17, 1000, '2021-01-05'),
+             |(8, 'a', 21, 1000, '2021-01-05'),
+             |(9, 'a', 22, 1000, '2021-01-05'),
+             |(7, 'a', 23, 1000, '2021-01-05')
+             |""".stripMargin)
+
+        // Insert overwrite static partition
+        spark.sql(
+          s"""
+             | insert overwrite table $tableName partition(dt = '2021-01-05')
+             | select * from (select 13 , 'a2', 12, 1000) limit 10
+        """.stripMargin)
+
+        // Double insert overwrite static partition
+        spark.sql(
+          s"""
+             | insert overwrite table $tableName partition(dt = '2021-01-05')
+             | select * from (select 13 , 'a3', 12, 1000) limit 10
+        """.stripMargin)
+        checkAnswer(s"select id, name, price, ts, dt from $tableName order by 
dt")(
+          Seq(13, "a3", 12.0, 1000, "2021-01-05")
+        )
+      })
+    }
+  }
 }

Reply via email to