This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4f875edaecd [HUDI-7139] Fix operation type for bulk insert with row
writer in Hudi Streamer (#10175)
4f875edaecd is described below
commit 4f875edaecd495eaa8996fa8d81c102a971c599f
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat Nov 25 15:10:37 2023 -0800
[HUDI-7139] Fix operation type for bulk insert with row writer in Hudi
Streamer (#10175)
This commit fixes the bug which causes the `operationType` to be null in
the commit metadata of bulk insert operation with row writer enabled in Hudi
Streamer (`hoodie.datasource.write.row.writer.enable=true`).
`HoodieStreamerDatasetBulkInsertCommitActionExecutor` is updated so that
`#preExecute` and `#afterExecute` should run the same logic as regular bulk
insert operation without row writer.
---
...odieStreamerDatasetBulkInsertCommitActionExecutor.java | 10 ++--------
.../utilities/deltastreamer/TestHoodieDeltaStreamer.java | 15 ++++++++++++---
2 files changed, 14 insertions(+), 11 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
index 5593a95ca39..2a5113538e4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
@@ -26,9 +26,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -44,12 +42,8 @@ public class
HoodieStreamerDatasetBulkInsertCommitActionExecutor extends BaseDat
@Override
protected void preExecute() {
- // no op
- }
-
- @Override
- protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>>
result) {
- // no op
+ table.validateInsertSchema();
+ writeClient.preWrite(instantTime, getWriteOperationType(),
table.getMetaClient());
}
@Override
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index f5304cce808..62aa7328fbb 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1377,7 +1377,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
if (i == 2 || i == 4) { // this validation reloads the timeline. So,
we are validating only for first and last batch.
// validate commit metadata for all completed commits to have valid
schema in extra metadata.
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
-
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry
-> assertValidSchemaInCommitMetadata(entry, metaClient));
+ metaClient.reloadActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants().getInstants()
+ .forEach(entry ->
assertValidSchemaAndOperationTypeInCommitMetadata(
+ entry, metaClient, WriteOperationType.BULK_INSERT));
}
}
} finally {
@@ -1743,15 +1746,21 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
// validate commit metadata for all completed commits to have valid schema
in extra metadata.
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
-
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry
-> assertValidSchemaInCommitMetadata(entry, metaClient));
+ metaClient.reloadActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants().getInstants()
+ .forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata(
+ entry, metaClient, WriteOperationType.INSERT));
testNum++;
}
- private void assertValidSchemaInCommitMetadata(HoodieInstant instant,
HoodieTableMetaClient metaClient) {
+ private void assertValidSchemaAndOperationTypeInCommitMetadata(HoodieInstant
instant,
+
HoodieTableMetaClient metaClient,
+
WriteOperationType operationType) {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
assertFalse(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)));
+ assertEquals(operationType, commitMetadata.getOperationType());
} catch (IOException ioException) {
throw new HoodieException("Failed to parse commit metadata for " +
instant.toString());
}