This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e081762e8cb2 feat: add record write failure log and metrics (#13417)
e081762e8cb2 is described below

commit e081762e8cb23c12aec4cb780e2ec2bd24a6c420
Author: Peter Huang <[email protected]>
AuthorDate: Thu Dec 18 12:52:11 2025 -0800

    feat: add record write failure log and metrics (#13417)
---
 .../java/org/apache/hudi/client/WriteStatus.java   |  2 +-
 .../hudi/metrics/FlinkStreamWriteMetrics.java      | 16 +++++++
 .../hudi/sink/append/AppendWriteFunction.java      | 27 ++++++++++-
 .../hudi/sink/append/TestAppendWriteFunction.java  | 52 +++++++++++++++++++++-
 4 files changed, 94 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index 0a34f6134706..9d9ec618ec86 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -91,7 +91,7 @@ public class WriteStatus implements Serializable {
   public WriteStatus() {
     this.failureFraction = 0.0d;
     this.trackSuccessRecords = false;
-    this.random = null;
+    this.random = new Random(RANDOM_SEED);
     this.isMetadataTable = false;
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
index a3d84b234f9d..01a86f7e3e72 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
@@ -66,6 +66,11 @@ public class FlinkStreamWriteMetrics extends 
HoodieFlinkMetrics {
    */
   private long numOfFilesWritten;
 
+  /**
+   * Number of record write failure during a checkpoint window.
+   */
+  private long numOfRecordWriteFailures;
+
   /**
    * Number of records written per seconds.
    */
@@ -104,6 +109,8 @@ public class FlinkStreamWriteMetrics extends 
HoodieFlinkMetrics {
     metricGroup.gauge("fileFlushTotalCosts", () -> fileFlushTotalCosts);
     metricGroup.gauge("numOfFilesWritten", () -> numOfFilesWritten);
     metricGroup.gauge("numOfOpenHandle", () -> numOfOpenHandle);
+    metricGroup.gauge("numOfRecordWriteFailures", () -> 
numOfRecordWriteFailures);
+
 
     metricGroup.meter("handleSwitchPerSecond", handleSwitchPerSecond);
 
@@ -132,6 +139,14 @@ public class FlinkStreamWriteMetrics extends 
HoodieFlinkMetrics {
     numOfFilesWritten += 1;
   }
 
+  public void increaseNumOfRecordWriteFailure(long recordWriteFailures) {
+    numOfRecordWriteFailures += recordWriteFailures;
+  }
+
+  public long getNumOfRecordWriteFailures() {
+    return numOfRecordWriteFailures;
+  }
+
   public void increaseNumOfOpenHandle() {
     numOfOpenHandle += 1;
     increaseNumOfFilesWritten();
@@ -165,6 +180,7 @@ public class FlinkStreamWriteMetrics extends 
HoodieFlinkMetrics {
     this.numOfOpenHandle = 0;
     this.writeBufferedSize = 0;
     this.fileFlushTotalCosts = 0;
+    this.numOfRecordWriteFailures = 0;
   }
 
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 0700e3ec3f22..063b16c06b45 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.sink.append;
 
 import org.apache.hudi.client.WriteStatus;
+
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
@@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Sink function to write the data to the underneath filesystem.
@@ -144,8 +147,8 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
       LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, 
this.currentInstant);
     }
 
+    recordWriteFailure(writeMetrics, writeStatus);
     StreamerUtil.validateWriteStatus(config, currentInstant, writeStatus);
-
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
         .checkpointId(this.checkpointId)
@@ -167,4 +170,26 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     writeMetrics = new FlinkStreamWriteMetrics(metrics);
     writeMetrics.registerMetrics();
   }
+
+  /**
+   * Update metrics and log for errors in write status.
+   *
+   * @param writeMetrics FlinkStreamWriteMetrics
+   * @param writeStatus write status from write handler
+   */
+  @VisibleForTesting
+  static void recordWriteFailure(FlinkStreamWriteMetrics writeMetrics, 
List<WriteStatus> writeStatus) {
+    Map.Entry<HoodieKey, Throwable> firstFailure = null;
+    for (WriteStatus status : writeStatus) {
+      
writeMetrics.increaseNumOfRecordWriteFailure(status.getTotalErrorRecords());
+      if (firstFailure == null && status.getErrors().size() > 0) {
+        firstFailure = 
status.getErrors().entrySet().stream().findFirst().get();
+      }
+    }
+
+    // Only print the first record failure to prevent logs occupy too much 
disk in worst case.
+    if (firstFailure != null) {
+      LOG.error("The first record with written failure {}", 
firstFailure.getKey().getRecordKey(), firstFailure.getValue());
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
index 7a55509c7138..97d8ab934e0a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
@@ -31,12 +31,30 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Test cases for {@link AppendWriteFunction}.
  */
 public class TestAppendWriteFunction {
 
+  private FlinkStreamWriteMetrics flinkStreamWriteMetrics;
+  private UnregisteredMetricsGroup metricGroup;
+
+  @BeforeEach
+  void setUp() {
+    metricGroup = new UnregisteredMetricsGroup();
+    flinkStreamWriteMetrics = new FlinkStreamWriteMetrics(metricGroup);
+  }
+
   @Test
   void testRecordWriteNoFailure() {
     WriteStatus writeStatus = new WriteStatus();
@@ -61,7 +79,7 @@ public class TestAppendWriteFunction {
   void testRecordWriteFailureValidationWithFailFast() {
     WriteStatus writeStatus = new WriteStatus();
     writeStatus.markFailure(
-            "key1", "/partition1", new RuntimeException("test exception"));
+        "key1", "/partition1", new RuntimeException("test exception"));
     List<WriteStatus> writeStatusList = Collections.singletonList(writeStatus);
 
     Configuration configuration = new Configuration();
@@ -71,4 +89,36 @@ public class TestAppendWriteFunction {
     assertThrows(HoodieException.class,
         () -> StreamerUtil.validateWriteStatus(configuration, 
HoodieInstantTimeGenerator.getCurrentInstantTimeStr(), writeStatusList));
   }
+
+  @Test
+  void testRecordWriteFailure() {
+    WriteStatus writeStatus = new WriteStatus();
+    writeStatus.markFailure(
+        "key1", "/partition1", new RuntimeException("test exception"));
+    List<WriteStatus> writeStatusList = Arrays.asList(writeStatus);
+
+    AppendWriteFunction.recordWriteFailure(flinkStreamWriteMetrics, 
writeStatusList);
+
+    // Verify that the failure was recorded in metrics
+    assertEquals(1, flinkStreamWriteMetrics.getNumOfRecordWriteFailures());
+  }
+
+  @Test
+  void testRecordWriteFailureMultipleErrors() {
+    WriteStatus writeStatus1 = new WriteStatus();
+    writeStatus1.markFailure("key1", "/partition1", new 
RuntimeException("error 1"));
+    writeStatus1.markFailure("key2", "/partition1", new 
RuntimeException("error 2"));
+    writeStatus1.markFailure("key3", "/partition1", new 
RuntimeException("error 3"));
+
+    WriteStatus writeStatus2 = new WriteStatus();
+    writeStatus2.markFailure("key4", "/partition2", new 
IllegalArgumentException("illegal argument"));
+    writeStatus2.markFailure("key5", "/partition2", new 
NullPointerException("null pointer"));
+
+    List<WriteStatus> writeStatusList = Arrays.asList(writeStatus1, 
writeStatus2);
+
+    AppendWriteFunction.recordWriteFailure(flinkStreamWriteMetrics, 
writeStatusList);
+
+    // Should record total 5 failures across both write statuses
+    assertEquals(5, flinkStreamWriteMetrics.getNumOfRecordWriteFailures());
+  }
 }

Reply via email to