[
https://issues.apache.org/jira/browse/HUDI-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381809#comment-17381809
]
ASF GitHub Bot commented on HUDI-1860:
--------------------------------------
nsivabalan commented on a change in pull request #3184:
URL: https://github.com/apache/hudi/pull/3184#discussion_r670990427
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -455,6 +459,12 @@ public void refreshTimeline() throws IOException {
case BULK_INSERT:
writeStatusRDD = writeClient.bulkInsert(records, instantTime);
break;
+ case INSERT_OVERWRITE:
+ writeStatusRDD = writeClient.insertOverwrite(records,
instantTime).getWriteStatuses();
+ break;
+ case INSERT_OVERWRITE_TABLE:
Review comment:
Do we plan to add "Delete_partition" in a follow up PR? Not looking to
expand the scope of this PR. just asking
##########
File path:
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
##########
@@ -163,6 +163,26 @@ private boolean allowWriteClientAccess(DagNode dagNode) {
}
}
+ public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime)
throws Exception {
Review comment:
awesome in fixing test suite as well :)
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1695,6 +1695,54 @@ public void
testJdbcSourceIncrementalFetchInContinuousMode() {
}
}
+ @Test
+ public void testInsertOverwrite() throws Exception {
+ String tableBasePath = dfsBasePath + "/insert_overwrite";
+ // Initial insert
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT);
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet",
sqlContext);
+ TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet",
sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ // No new data => no commits.
+ cfg.sourceLimit = 0;
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet",
sqlContext);
+ TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet",
sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ // insert overwrite
+ cfg.sourceLimit = 1000;
+ cfg.operation = WriteOperationType.INSERT_OVERWRITE;
Review comment:
- I see lot of commonality between both these tests. Can we reuse code
as much as possible.
- InsertOverwrite overwrites only the partitions matching the incoming
records. But guess we are not really testing that. Do you think we can test
this scenario.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -475,8 +485,8 @@ public void refreshTimeline() throws IOException {
LOG.warn("Some records failed to be merged but forcing commit since
commitOnErrors set. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
-
- boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata));
+ String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
Review comment:
Since you folks did some investigation around this, Do you think we need
to improve docs on
WriteClient.startCommitWithTime(). There are two overloaded methods. may be
clearly calling out when to call which one would benefit all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Add INSERT_OVERWRITE support to DeltaStreamer
> ---------------------------------------------
>
> Key: HUDI-1860
> URL: https://issues.apache.org/jira/browse/HUDI-1860
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: Sagar Sumit
> Assignee: Samrat Deb
> Priority: Major
> Labels: pull-request-available
> Original Estimate: 72h
> Remaining Estimate: 72h
>
> As discussed in [this
> RFC|https://cwiki.apache.org/confluence/display/HUDI/RFC+-+14+%3A+JDBC+incremental+puller],
> having full fetch mode use the inser_overwrite to write to sync would be
> better as it can handle schema changes.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)