This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fc3e81f24e7 [HUDI-5407] Following the first patch, add changes for 
Flink (#8204)
fc3e81f24e7 is described below

commit fc3e81f24e7d40da7b8da526db78a0351ae75dca
Author: Danny Chan <[email protected]>
AuthorDate: Fri Mar 17 15:00:13 2023 +0800

    [HUDI-5407] Following the first patch, add changes for Flink (#8204)
---
 .../hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java    |  6 ++++++
 .../apache/hudi/sink/TestStreamWriteOperatorCoordinator.java   | 10 ++++++----
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index cd45685e13e..3a80c574fc5 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
@@ -126,6 +127,11 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
 
     try (HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
+      // rollback partially failed writes if any.
+      if (writeClient.rollbackFailedWrites()) {
+        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+      }
+
       if (canTriggerTableService) {
         // trigger compaction before doing the delta commit. this is to 
ensure, if this delta commit succeeds in metadata table, but failed in data 
table,
         // we would have compacted metadata table and so could have included 
uncommitted data which will never be ignored while reading from metadata
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 808e9b2d94e..7a9a6380363 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -277,7 +277,7 @@ public class TestStreamWriteOperatorCoordinator {
   }
 
   @Test
-  void testSyncMetadataTableWithReusedInstant() throws Exception {
+  void testSyncMetadataTableWithRollback() throws Exception {
     // reset
     reset();
     // override the default configuration
@@ -311,12 +311,14 @@ public class TestStreamWriteOperatorCoordinator {
     metadataTableMetaClient.reloadActiveTimeline();
 
     // write another commit with existing instant on the metadata timeline
-    instant = mockWriteWithMetadata();
+    mockWriteWithMetadata();
     metadataTableMetaClient.reloadActiveTimeline();
 
     completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
-    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(3));
-    assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
is(instant));
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(4));
+    assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), 
is(instant));
+    assertThat("The pending instant should be rolled back first",
+        completedTimeline.lastInstant().get().getAction(), 
is(HoodieTimeline.ROLLBACK_ACTION));
   }
 
   @Test

Reply via email to