This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e35d2e83e7a [MINOR] Add log in flink compact/cluster commit sink for 
troubleshoot… (#8546)
e35d2e83e7a is described below

commit e35d2e83e7ad76b6d8ede0e16ed077cad11e04e9
Author: Bingeng Huang <[email protected]>
AuthorDate: Mon Sep 23 07:43:48 2024 +0800

    [MINOR] Add log in flink compact/cluster commit sink for troubleshoot… 
(#8546)
    
    * [MINOR] Add log in flink compact/cluster commit sink for troubleshooting.
    
    * Fix logging style
    
    * Fix NPE
    
    ---------
    
    Co-authored-by: hbg <[email protected]>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../apache/hudi/sink/clustering/ClusteringCommitSink.java | 15 +++++++++++++++
 .../apache/hudi/sink/compact/CompactionCommitSink.java    | 15 +++++++++++++++
 2 files changed, 30 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index adaaa394481..eea30abf53b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -112,11 +112,26 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
   @Override
   public void invoke(ClusteringCommitEvent event, Context context) throws 
Exception {
     final String instant = event.getInstant();
+    if (event.isFailed()
+        || (event.getWriteStatuses() != null
+        && event.getWriteStatuses().stream().anyMatch(writeStatus -> 
writeStatus.getTotalErrorRecords() > 0))) {
+      LOG.warn("Receive abnormal ClusteringCommitEvent of instant {}, task ID 
is {},"
+              + " is failed: {}, error record count: {}",
+          instant, event.getTaskID(), event.isFailed(), 
getNumErrorRecords(event));
+    }
     commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
         .put(event.getFileIds(), event);
     commitIfNecessary(instant, commitBuffer.get(instant).values());
   }
 
+  private long getNumErrorRecords(ClusteringCommitEvent event) {
+    if (event.getWriteStatuses() == null) {
+      return -1L;
+    }
+    return event.getWriteStatuses().stream()
+        .map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+  }
+
   /**
    * Condition to commit: the commit buffer has equal size with the clustering 
plan operations
    * and all the clustering commit event {@link ClusteringCommitEvent} has the 
same clustering instant time.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 5505e7b5756..85d5df160ea 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -109,11 +109,26 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
   @Override
   public void invoke(CompactionCommitEvent event, Context context) throws 
Exception {
     final String instant = event.getInstant();
+    if (event.isFailed()
+        || (event.getWriteStatuses() != null
+        && event.getWriteStatuses().stream().anyMatch(writeStatus -> 
writeStatus.getTotalErrorRecords() > 0))) {
+      LOG.warn("Receive abnormal CompactionCommitEvent of instant {}, task ID 
is {},"
+              + " is failed: {}, error record count: {}",
+          instant, event.getTaskID(), event.isFailed(), 
getNumErrorRecords(event));
+    }
     commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
         .put(event.getFileId(), event);
     commitIfNecessary(instant, commitBuffer.get(instant).values());
   }
 
+  private long getNumErrorRecords(CompactionCommitEvent event) {
+    if (event.getWriteStatuses() == null) {
+      return -1L;
+    }
+    return event.getWriteStatuses().stream()
+        .map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+  }
+
   /**
    * Condition to commit: the commit buffer has equal size with the compaction 
plan operations
    * and all the compact commit event {@link CompactionCommitEvent} has the 
same compaction instant time.

Reply via email to