This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 016bcf769b6 [MINOR] Optimize print write error msg in
StreamWriteOperatorCoordinator#doCommit (#10809)
016bcf769b6 is described below
commit 016bcf769b6ade87aa551f81432b22a09799b339
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Wed Apr 10 08:59:01 2024 +0800
[MINOR] Optimize print write error msg in
StreamWriteOperatorCoordinator#doCommit (#10809)
---
.../apache/hudi/sink/StreamWriteOperatorCoordinator.java | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 26ca245f8be..61feb294b49 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -588,12 +588,17 @@ public class StreamWriteOperatorCoordinator
}
} else {
LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/"
+ totalRecords);
- LOG.error("The first 100 error messages");
-
writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
- LOG.error("Global error for partition path {} and fileID {}: {}",
- ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());
+ LOG.error("The first 10 files with write errors:");
+
writeResults.stream().filter(WriteStatus::hasErrors).limit(10).forEach(ws -> {
+ if (ws.getGlobalError() != null) {
+ LOG.error("Global error for partition path {} and fileID {}: {}",
+ ws.getPartitionPath(), ws.getFileId(), ws.getGlobalError());
+ }
if (ws.getErrors().size() > 0) {
- ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" +
key + " and value " + value));
+ LOG.error("The first 100 records-level errors for partition path {}
and fileID {}:",
+ ws.getPartitionPath(), ws.getFileId());
+ ws.getErrors().entrySet().stream().limit(100).forEach(entry ->
LOG.error("Error for key: "
+ + entry.getKey() + " and Exception: " +
entry.getValue().getMessage()));
}
});
// Rolls back instant