This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e35d2e83e7a [MINOR] Add log in flink compact/cluster commit sink for
troubleshoot… (#8546)
e35d2e83e7a is described below
commit e35d2e83e7ad76b6d8ede0e16ed077cad11e04e9
Author: Bingeng Huang <[email protected]>
AuthorDate: Mon Sep 23 07:43:48 2024 +0800
[MINOR] Add log in flink compact/cluster commit sink for troubleshoot…
(#8546)
* [MINOR] Add log in flink compact/cluster commit sink for troubleshooting.
* Fix logging style
* Fix NPE
---------
Co-authored-by: hbg <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../apache/hudi/sink/clustering/ClusteringCommitSink.java | 15 +++++++++++++++
.../apache/hudi/sink/compact/CompactionCommitSink.java | 15 +++++++++++++++
2 files changed, 30 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index adaaa394481..eea30abf53b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -112,11 +112,26 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
@Override
public void invoke(ClusteringCommitEvent event, Context context) throws
Exception {
final String instant = event.getInstant();
+ if (event.isFailed()
+ || (event.getWriteStatuses() != null
+ && event.getWriteStatuses().stream().anyMatch(writeStatus ->
writeStatus.getTotalErrorRecords() > 0))) {
+ LOG.warn("Receive abnormal ClusteringCommitEvent of instant {}, task ID
is {},"
+ + " is failed: {}, error record count: {}",
+ instant, event.getTaskID(), event.isFailed(),
getNumErrorRecords(event));
+ }
commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
.put(event.getFileIds(), event);
commitIfNecessary(instant, commitBuffer.get(instant).values());
}
+ private long getNumErrorRecords(ClusteringCommitEvent event) {
+ if (event.getWriteStatuses() == null) {
+ return -1L;
+ }
+ return event.getWriteStatuses().stream()
+ .map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+ }
+
/**
* Condition to commit: the commit buffer has equal size with the clustering
plan operations
* and all the clustering commit event {@link ClusteringCommitEvent} has the
same clustering instant time.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 5505e7b5756..85d5df160ea 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -109,11 +109,26 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
@Override
public void invoke(CompactionCommitEvent event, Context context) throws
Exception {
final String instant = event.getInstant();
+ if (event.isFailed()
+ || (event.getWriteStatuses() != null
+ && event.getWriteStatuses().stream().anyMatch(writeStatus ->
writeStatus.getTotalErrorRecords() > 0))) {
+ LOG.warn("Receive abnormal CompactionCommitEvent of instant {}, task ID
is {},"
+ + " is failed: {}, error record count: {}",
+ instant, event.getTaskID(), event.isFailed(),
getNumErrorRecords(event));
+ }
commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
.put(event.getFileId(), event);
commitIfNecessary(instant, commitBuffer.get(instant).values());
}
+ private long getNumErrorRecords(CompactionCommitEvent event) {
+ if (event.getWriteStatuses() == null) {
+ return -1L;
+ }
+ return event.getWriteStatuses().stream()
+ .map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+ }
+
/**
* Condition to commit: the commit buffer has equal size with the compaction
plan operations
* and all the compact commit event {@link CompactionCommitEvent} has the
same compaction instant time.