This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ed6b7f6aedc [HUDI-4406] Support Flink compaction/clustering write 
error resolvement to avoid data loss (#6121)
ed6b7f6aedc is described below

commit ed6b7f6aedc2cba0f753a4ee130cef860ecb0801
Author: Chenshizhi <[email protected]>
AuthorDate: Tue Feb 14 18:15:18 2023 +0800

    [HUDI-4406] Support Flink compaction/clustering write error resolvement to 
avoid data loss (#6121)
---
 .../main/java/org/apache/hudi/configuration/FlinkOptions.java |  6 +++---
 .../org/apache/hudi/sink/clustering/ClusteringCommitSink.java | 11 +++++++++++
 .../org/apache/hudi/sink/compact/CompactionCommitSink.java    | 11 +++++++++++
 3 files changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index e447692fc98..9cdeb963d53 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -382,9 +382,9 @@ public class FlinkOptions extends HoodieConfig {
       .key("write.ignore.failed")
       .booleanType()
       .defaultValue(false)
-      .withDescription("Flag to indicate whether to ignore any non exception 
error (e.g. writestatus error). within a checkpoint batch.\n"
-          + "By default false.  Turning this on, could hide the write status 
errors while the spark checkpoint moves ahead. \n"
-          + "  So, would recommend users to use this with caution.");
+      .withDescription("Flag to indicate whether to ignore any non exception 
error (e.g. writestatus error). within a checkpoint batch. \n"
+          + "By default false. Turning this on, could hide the write status 
errors while the flink checkpoint moves ahead. \n"
+          + "So, would recommend users to use this with caution.");
 
   public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
       .key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index eb567d89f18..3f392de1527 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -147,6 +147,17 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
 
+    long numErrorRecords = 
statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+    if (numErrorRecords > 0 && 
!this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+      // handle failure case
+      LOG.error("Got {} error records during clustering of instant {},\n"
+          + "option '{}' is configured as false,"
+          + "rolls back the clustering", numErrorRecords, instant, 
FlinkOptions.IGNORE_FAILED.key());
+      ClusteringUtil.rollbackClustering(table, writeClient, instant);
+      return;
+    }
+
     HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new 
HoodieWriteMetadata<>();
     writeMetadata.setWriteStatuses(statuses);
     
writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 1e05dce6076..0e9bc54f8fb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -157,6 +157,17 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
 
+    long numErrorRecords = 
statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+    if (numErrorRecords > 0 && 
!this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+      // handle failure case
+      LOG.error("Got {} error records during compaction of instant {},\n"
+          + "option '{}' is configured as false,"
+          + "rolls back the compaction", numErrorRecords, instant, 
FlinkOptions.IGNORE_FAILED.key());
+      CompactionUtil.rollbackCompaction(table, instant);
+      return;
+    }
+
     HoodieCommitMetadata metadata = 
CompactHelpers.getInstance().createCompactionMetadata(
         table, instant, HoodieListData.eager(statuses), 
writeClient.getConfig().getSchema());
 

Reply via email to