This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e081762e8cb2 feat: add record write failure log and metrics (#13417)
e081762e8cb2 is described below
commit e081762e8cb23c12aec4cb780e2ec2bd24a6c420
Author: Peter Huang <[email protected]>
AuthorDate: Thu Dec 18 12:52:11 2025 -0800
feat: add record write failure log and metrics (#13417)
---
.../java/org/apache/hudi/client/WriteStatus.java | 2 +-
.../hudi/metrics/FlinkStreamWriteMetrics.java | 16 +++++++
.../hudi/sink/append/AppendWriteFunction.java | 27 ++++++++++-
.../hudi/sink/append/TestAppendWriteFunction.java | 52 +++++++++++++++++++++-
4 files changed, 94 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index 0a34f6134706..9d9ec618ec86 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -91,7 +91,7 @@ public class WriteStatus implements Serializable {
public WriteStatus() {
this.failureFraction = 0.0d;
this.trackSuccessRecords = false;
- this.random = null;
+ this.random = new Random(RANDOM_SEED);
this.isMetadataTable = false;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
index a3d84b234f9d..01a86f7e3e72 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java
@@ -66,6 +66,11 @@ public class FlinkStreamWriteMetrics extends
HoodieFlinkMetrics {
*/
private long numOfFilesWritten;
+ /**
+ * Number of record write failure during a checkpoint window.
+ */
+ private long numOfRecordWriteFailures;
+
/**
* Number of records written per seconds.
*/
@@ -104,6 +109,8 @@ public class FlinkStreamWriteMetrics extends
HoodieFlinkMetrics {
metricGroup.gauge("fileFlushTotalCosts", () -> fileFlushTotalCosts);
metricGroup.gauge("numOfFilesWritten", () -> numOfFilesWritten);
metricGroup.gauge("numOfOpenHandle", () -> numOfOpenHandle);
+ metricGroup.gauge("numOfRecordWriteFailures", () ->
numOfRecordWriteFailures);
+
metricGroup.meter("handleSwitchPerSecond", handleSwitchPerSecond);
@@ -132,6 +139,14 @@ public class FlinkStreamWriteMetrics extends
HoodieFlinkMetrics {
numOfFilesWritten += 1;
}
+ public void increaseNumOfRecordWriteFailure(long recordWriteFailures) {
+ numOfRecordWriteFailures += recordWriteFailures;
+ }
+
+ public long getNumOfRecordWriteFailures() {
+ return numOfRecordWriteFailures;
+ }
+
public void increaseNumOfOpenHandle() {
numOfOpenHandle += 1;
increaseNumOfFilesWritten();
@@ -165,6 +180,7 @@ public class FlinkStreamWriteMetrics extends
HoodieFlinkMetrics {
this.numOfOpenHandle = 0;
this.writeBufferedSize = 0;
this.fileFlushTotalCosts = 0;
+ this.numOfRecordWriteFailures = 0;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 0700e3ec3f22..063b16c06b45 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -19,6 +19,8 @@
package org.apache.hudi.sink.append;
import org.apache.hudi.client.WriteStatus;
+
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
@@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
/**
* Sink function to write the data to the underneath filesystem.
@@ -144,8 +147,8 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID,
this.currentInstant);
}
+ recordWriteFailure(writeMetrics, writeStatus);
StreamerUtil.validateWriteStatus(config, currentInstant, writeStatus);
-
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.checkpointId(this.checkpointId)
@@ -167,4 +170,26 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
writeMetrics = new FlinkStreamWriteMetrics(metrics);
writeMetrics.registerMetrics();
}
+
+ /**
+ * Update metrics and log for errors in write status.
+ *
+ * @param writeMetrics FlinkStreamWriteMetrics
+ * @param writeStatus write status from write handler
+ */
+ @VisibleForTesting
+ static void recordWriteFailure(FlinkStreamWriteMetrics writeMetrics,
List<WriteStatus> writeStatus) {
+ Map.Entry<HoodieKey, Throwable> firstFailure = null;
+ for (WriteStatus status : writeStatus) {
+
writeMetrics.increaseNumOfRecordWriteFailure(status.getTotalErrorRecords());
+ if (firstFailure == null && status.getErrors().size() > 0) {
+ firstFailure =
status.getErrors().entrySet().stream().findFirst().get();
+ }
+ }
+
+ // Only print the first record failure to prevent logs occupy too much
disk in worst case.
+ if (firstFailure != null) {
+ LOG.error("The first record with written failure {}",
firstFailure.getKey().getRecordKey(), firstFailure.getValue());
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
index 7a55509c7138..97d8ab934e0a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
@@ -31,12 +31,30 @@ import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Test cases for {@link AppendWriteFunction}.
*/
public class TestAppendWriteFunction {
+ private FlinkStreamWriteMetrics flinkStreamWriteMetrics;
+ private UnregisteredMetricsGroup metricGroup;
+
+ @BeforeEach
+ void setUp() {
+ metricGroup = new UnregisteredMetricsGroup();
+ flinkStreamWriteMetrics = new FlinkStreamWriteMetrics(metricGroup);
+ }
+
@Test
void testRecordWriteNoFailure() {
WriteStatus writeStatus = new WriteStatus();
@@ -61,7 +79,7 @@ public class TestAppendWriteFunction {
void testRecordWriteFailureValidationWithFailFast() {
WriteStatus writeStatus = new WriteStatus();
writeStatus.markFailure(
- "key1", "/partition1", new RuntimeException("test exception"));
+ "key1", "/partition1", new RuntimeException("test exception"));
List<WriteStatus> writeStatusList = Collections.singletonList(writeStatus);
Configuration configuration = new Configuration();
@@ -71,4 +89,36 @@ public class TestAppendWriteFunction {
assertThrows(HoodieException.class,
() -> StreamerUtil.validateWriteStatus(configuration,
HoodieInstantTimeGenerator.getCurrentInstantTimeStr(), writeStatusList));
}
+
+ @Test
+ void testRecordWriteFailure() {
+ WriteStatus writeStatus = new WriteStatus();
+ writeStatus.markFailure(
+ "key1", "/partition1", new RuntimeException("test exception"));
+ List<WriteStatus> writeStatusList = Arrays.asList(writeStatus);
+
+ AppendWriteFunction.recordWriteFailure(flinkStreamWriteMetrics,
writeStatusList);
+
+ // Verify that the failure was recorded in metrics
+ assertEquals(1, flinkStreamWriteMetrics.getNumOfRecordWriteFailures());
+ }
+
+ @Test
+ void testRecordWriteFailureMultipleErrors() {
+ WriteStatus writeStatus1 = new WriteStatus();
+ writeStatus1.markFailure("key1", "/partition1", new
RuntimeException("error 1"));
+ writeStatus1.markFailure("key2", "/partition1", new
RuntimeException("error 2"));
+ writeStatus1.markFailure("key3", "/partition1", new
RuntimeException("error 3"));
+
+ WriteStatus writeStatus2 = new WriteStatus();
+ writeStatus2.markFailure("key4", "/partition2", new
IllegalArgumentException("illegal argument"));
+ writeStatus2.markFailure("key5", "/partition2", new
NullPointerException("null pointer"));
+
+ List<WriteStatus> writeStatusList = Arrays.asList(writeStatus1,
writeStatus2);
+
+ AppendWriteFunction.recordWriteFailure(flinkStreamWriteMetrics,
writeStatusList);
+
+ // Should record total 5 failures across both write statuses
+ assertEquals(5, flinkStreamWriteMetrics.getNumOfRecordWriteFailures());
+ }
}