This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fc3e81f24e7 [HUDI-5407] Following the first patch, add changes for
Flink (#8204)
fc3e81f24e7 is described below
commit fc3e81f24e7d40da7b8da526db78a0351ae75dca
Author: Danny Chan <[email protected]>
AuthorDate: Fri Mar 17 15:00:13 2023 +0800
[HUDI-5407] Following the first patch, add changes for Flink (#8204)
---
.../hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java | 6 ++++++
.../apache/hudi/sink/TestStreamWriteOperatorCoordinator.java | 10 ++++++----
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index cd45685e13e..3a80c574fc5 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
@@ -126,6 +127,11 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
try (HoodieFlinkWriteClient writeClient = new
HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
+ // rollback partially failed writes if any.
+ if (writeClient.rollbackFailedWrites()) {
+ metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+ }
+
if (canTriggerTableService) {
// trigger compaction before doing the delta commit. this is to
ensure, if this delta commit succeeds in metadata table, but failed in data
table,
// we would have compacted metadata table and so could have included
uncommitted data which will never be ignored while reading from metadata
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 808e9b2d94e..7a9a6380363 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -277,7 +277,7 @@ public class TestStreamWriteOperatorCoordinator {
}
@Test
- void testSyncMetadataTableWithReusedInstant() throws Exception {
+ void testSyncMetadataTableWithRollback() throws Exception {
// reset
reset();
// override the default configuration
@@ -311,12 +311,14 @@ public class TestStreamWriteOperatorCoordinator {
metadataTableMetaClient.reloadActiveTimeline();
// write another commit with existing instant on the metadata timeline
- instant = mockWriteWithMetadata();
+ mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline =
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
- assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(3));
- assertThat(completedTimeline.lastInstant().get().getTimestamp(),
is(instant));
+ assertThat("One instant need to sync to metadata table",
completedTimeline.countInstants(), is(4));
+ assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(),
is(instant));
+ assertThat("The pending instant should be rolled back first",
+ completedTimeline.lastInstant().get().getAction(),
is(HoodieTimeline.ROLLBACK_ACTION));
}
@Test