This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a9225bd9093 [HUDI-6515] Fix bucket index row writer write record to
wrong handle (#9156)
a9225bd9093 is described below
commit a9225bd9093e8be8f9ccdd07b0bc542994ffc4c6
Author: StreamingFlames <[email protected]>
AuthorDate: Mon Jul 10 17:46:32 2023 +0800
[HUDI-6515] Fix bucket index row writer write record to wrong handle (#9156)
Co-authored-by: fuqijun <[email protected]>
---
.../BucketBulkInsertDataInternalWriterHelper.java | 8 ++++----
.../apache/spark/sql/hudi/TestInsertTable.scala | 23 +++++++++++++++++++++-
2 files changed, 26 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
index d6e83c7213a..e09c5a74205 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -45,7 +45,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends
BulkInsertDataInte
private Pair<UTF8String, Integer> lastFileId; // for efficient code path
// p -> (fileId -> handle)
- private final Map<String, HoodieRowCreateHandle> handles;
+ private final Map<Pair<UTF8String, Integer>, HoodieRowCreateHandle> handles;
private final String indexKeyFields;
private final int bucketNum;
@@ -66,7 +66,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends
BulkInsertDataInte
Pair<UTF8String, Integer> fileId = Pair.of(partitionPath, bucketId);
if (lastFileId == null || !lastFileId.equals(fileId)) {
LOG.info("Creating new file for partition path " + partitionPath);
- handle = getBucketRowCreateHandle(String.valueOf(partitionPath),
bucketId);
+ handle = getBucketRowCreateHandle(fileId, bucketId);
lastFileId = fileId;
}
handle.write(row);
@@ -91,13 +91,13 @@ public class BucketBulkInsertDataInternalWriterHelper
extends BulkInsertDataInte
}
}
- protected HoodieRowCreateHandle getBucketRowCreateHandle(String fileId, int
bucketId) throws Exception {
+ protected HoodieRowCreateHandle getBucketRowCreateHandle(Pair<UTF8String,
Integer> fileId, int bucketId) throws Exception {
if (!handles.containsKey(fileId)) { // if there is no handle corresponding
to the fileId
if (this.arePartitionRecordsSorted) {
// if records are sorted, we can close all existing handles
close();
}
- HoodieRowCreateHandle rowCreateHandle = new
HoodieRowCreateHandle(hoodieTable, writeConfig, fileId,
getNextBucketFileId(bucketId),
+ HoodieRowCreateHandle rowCreateHandle = new
HoodieRowCreateHandle(hoodieTable, writeConfig,
String.valueOf(fileId.getLeft()), getNextBucketFileId(bucketId),
instantTime, taskPartitionId, taskId, taskEpochId, structType,
shouldPreserveHoodieMetadata);
handles.put(fileId, rowCreateHandle);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 950210f6d38..7cd90145507 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1192,7 +1192,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
test("Test Bulk Insert Into Bucket Index Table") {
- withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
+ withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert",
"hoodie.bulkinsert.shuffle.parallelism" -> "1") {
Seq("mor", "cow").foreach { tableType =>
Seq("true", "false").foreach { bulkInsertAsRow =>
withTempDir { tmp =>
@@ -1247,6 +1247,27 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
Seq(3, "a3", 30.0, 3000, "2021-01-07")
)
+
+ // there are two files in partition(dt = '2021-01-05')
+ checkAnswer(s"select count(distinct _hoodie_file_name) from
$tableName where dt = '2021-01-05'")(
+ Seq(2)
+ )
+
+ // would generate 6 other files in partition(dt = '2021-01-05')
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (4, 'a1,1', 10, 1000, "2021-01-05"),
+ | (5, 'a1,1', 10, 1000, "2021-01-05"),
+ | (6, 'a1,1', 10, 1000, "2021-01-05"),
+ | (7, 'a1,1', 10, 1000, "2021-01-05"),
+ | (8, 'a1,1', 10, 1000, "2021-01-05"),
+ | (9, 'a3,3', 30, 3000, "2021-01-05")
+ """.stripMargin)
+
+ checkAnswer(s"select count(distinct _hoodie_file_name) from
$tableName where dt = '2021-01-05'")(
+ Seq(8)
+ )
}
}
}