This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a9225bd9093 [HUDI-6515] Fix bucket index row writer write record to 
wrong handle (#9156)
a9225bd9093 is described below

commit a9225bd9093e8be8f9ccdd07b0bc542994ffc4c6
Author: StreamingFlames <[email protected]>
AuthorDate: Mon Jul 10 17:46:32 2023 +0800

    [HUDI-6515] Fix bucket index row writer write record to wrong handle (#9156)
    
    Co-authored-by: fuqijun <[email protected]>
---
 .../BucketBulkInsertDataInternalWriterHelper.java  |  8 ++++----
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 23 +++++++++++++++++++++-
 2 files changed, 26 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
index d6e83c7213a..e09c5a74205 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -45,7 +45,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
 
   private Pair<UTF8String, Integer> lastFileId; // for efficient code path
   // p -> (fileId -> handle)
-  private final Map<String, HoodieRowCreateHandle> handles;
+  private final Map<Pair<UTF8String, Integer>, HoodieRowCreateHandle> handles;
   private final String indexKeyFields;
   private final int bucketNum;
 
@@ -66,7 +66,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
       Pair<UTF8String, Integer> fileId = Pair.of(partitionPath, bucketId);
       if (lastFileId == null || !lastFileId.equals(fileId)) {
         LOG.info("Creating new file for partition path " + partitionPath);
-        handle = getBucketRowCreateHandle(String.valueOf(partitionPath), 
bucketId);
+        handle = getBucketRowCreateHandle(fileId, bucketId);
         lastFileId = fileId;
       }
       handle.write(row);
@@ -91,13 +91,13 @@ public class BucketBulkInsertDataInternalWriterHelper 
extends BulkInsertDataInte
     }
   }
 
-  protected HoodieRowCreateHandle getBucketRowCreateHandle(String fileId, int 
bucketId) throws Exception {
+  protected HoodieRowCreateHandle getBucketRowCreateHandle(Pair<UTF8String, 
Integer> fileId, int bucketId) throws Exception {
     if (!handles.containsKey(fileId)) { // if there is no handle corresponding 
to the fileId
       if (this.arePartitionRecordsSorted) {
         // if records are sorted, we can close all existing handles
         close();
       }
-      HoodieRowCreateHandle rowCreateHandle = new 
HoodieRowCreateHandle(hoodieTable, writeConfig, fileId, 
getNextBucketFileId(bucketId),
+      HoodieRowCreateHandle rowCreateHandle = new 
HoodieRowCreateHandle(hoodieTable, writeConfig, 
String.valueOf(fileId.getLeft()), getNextBucketFileId(bucketId),
           instantTime, taskPartitionId, taskId, taskEpochId, structType, 
shouldPreserveHoodieMetadata);
       handles.put(fileId, rowCreateHandle);
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 950210f6d38..7cd90145507 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1192,7 +1192,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test Bulk Insert Into Bucket Index Table") {
-    withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
+    withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert", 
"hoodie.bulkinsert.shuffle.parallelism" -> "1") {
       Seq("mor", "cow").foreach { tableType =>
         Seq("true", "false").foreach { bulkInsertAsRow =>
           withTempDir { tmp =>
@@ -1247,6 +1247,27 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
               Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
               Seq(3, "a3", 30.0, 3000, "2021-01-07")
             )
+
+            // there are two files in partition(dt = '2021-01-05')
+            checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
+              Seq(2)
+            )
+
+            // would generate 6 other files in partition(dt = '2021-01-05')
+            spark.sql(
+              s"""
+                 | insert into $tableName values
+                 | (4, 'a1,1', 10, 1000, "2021-01-05"),
+                 | (5, 'a1,1', 10, 1000, "2021-01-05"),
+                 | (6, 'a1,1', 10, 1000, "2021-01-05"),
+                 | (7, 'a1,1', 10, 1000, "2021-01-05"),
+                 | (8, 'a1,1', 10, 1000, "2021-01-05"),
+                 | (9, 'a3,3', 30, 3000, "2021-01-05")
+               """.stripMargin)
+
+            checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
+              Seq(8)
+            )
           }
         }
       }

Reply via email to