This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ed6b7f6aedc [HUDI-4406] Support Flink compaction/clustering write
error resolvement to avoid data loss (#6121)
ed6b7f6aedc is described below
commit ed6b7f6aedc2cba0f753a4ee130cef860ecb0801
Author: Chenshizhi <[email protected]>
AuthorDate: Tue Feb 14 18:15:18 2023 +0800
[HUDI-4406] Support Flink compaction/clustering write error resolvement to
avoid data loss (#6121)
---
.../main/java/org/apache/hudi/configuration/FlinkOptions.java | 6 +++---
.../org/apache/hudi/sink/clustering/ClusteringCommitSink.java | 11 +++++++++++
.../org/apache/hudi/sink/compact/CompactionCommitSink.java | 11 +++++++++++
3 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index e447692fc98..9cdeb963d53 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -382,9 +382,9 @@ public class FlinkOptions extends HoodieConfig {
.key("write.ignore.failed")
.booleanType()
.defaultValue(false)
- .withDescription("Flag to indicate whether to ignore any non exception
error (e.g. writestatus error). within a checkpoint batch.\n"
- + "By default false. Turning this on, could hide the write status
errors while the spark checkpoint moves ahead. \n"
- + " So, would recommend users to use this with caution.");
+ .withDescription("Flag to indicate whether to ignore any non exception
error (e.g. writestatus error). within a checkpoint batch. \n"
+ + "By default false. Turning this on, could hide the write status
errors while the flink checkpoint moves ahead. \n"
+ + "So, would recommend users to use this with caution.");
public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
.key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index eb567d89f18..3f392de1527 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -147,6 +147,17 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
.flatMap(Collection::stream)
.collect(Collectors.toList());
+ long numErrorRecords =
statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+ if (numErrorRecords > 0 &&
!this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+ // handle failure case
+ LOG.error("Got {} error records during clustering of instant {},\n"
+ + "option '{}' is configured as false,"
+ + "rolls back the clustering", numErrorRecords, instant,
FlinkOptions.IGNORE_FAILED.key());
+ ClusteringUtil.rollbackClustering(table, writeClient, instant);
+ return;
+ }
+
HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new
HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(statuses);
writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 1e05dce6076..0e9bc54f8fb 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -157,6 +157,17 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
.flatMap(Collection::stream)
.collect(Collectors.toList());
+ long numErrorRecords =
statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+ if (numErrorRecords > 0 &&
!this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+ // handle failure case
+ LOG.error("Got {} error records during compaction of instant {},\n"
+ + "option '{}' is configured as false,"
+ + "rolls back the compaction", numErrorRecords, instant,
FlinkOptions.IGNORE_FAILED.key());
+ CompactionUtil.rollbackCompaction(table, instant);
+ return;
+ }
+
HoodieCommitMetadata metadata =
CompactHelpers.getInstance().createCompactionMetadata(
table, instant, HoodieListData.eager(statuses),
writeClient.getConfig().getSchema());