This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4271e01e2e159617e7b22f891f519c1870bf2e5f Author: zhuanshenbsj1 <[email protected]> AuthorDate: Wed Apr 10 08:59:01 2024 +0800 [MINOR] Optimize print write error msg in StreamWriteOperatorCoordinator#doCommit (#10809) --- .../apache/hudi/sink/StreamWriteOperatorCoordinator.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 8d2cf38ed0a..d2912895df7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -586,12 +586,17 @@ public class StreamWriteOperatorCoordinator } } else { LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); - LOG.error("The first 100 error messages"); - writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> { - LOG.error("Global error for partition path {} and fileID {}: {}", - ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId()); + LOG.error("The first 10 files with write errors:"); + writeResults.stream().filter(WriteStatus::hasErrors).limit(10).forEach(ws -> { + if (ws.getGlobalError() != null) { + LOG.error("Global error for partition path {} and fileID {}: {}", + ws.getPartitionPath(), ws.getFileId(), ws.getGlobalError()); + } if (ws.getErrors().size() > 0) { - ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value)); + LOG.error("The first 100 records-level errors for partition path {} and fileID {}:", + ws.getPartitionPath(), ws.getFileId()); + ws.getErrors().entrySet().stream().limit(100).forEach(entry -> LOG.error("Error for key: " + + entry.getKey() + " and Exception: " + entry.getValue().getMessage())); } }); // Rolls back instant
