yunqingmoswu commented on code in PR #6752:
URL: https://github.com/apache/inlong/pull/6752#discussion_r1046626735
##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -196,8 +216,39 @@ private void checkFlushException() {
}
}
+ void handleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) {
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(rowSize, dataSize);
+ }
+ if (!dirtyOptions.ignoreDirty()) {
+ RuntimeException ex;
+ if (e instanceof RuntimeException) {
+ ex = (RuntimeException) e;
+ } else {
+ ex = new RuntimeException(e);
+ }
+ throw ex;
+ }
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(dirtyData)
+ .setDirtyType(dirtyType)
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setIdentifier(dirtyOptions.getIdentifier());
Review Comment:
Please add 'setDirtyMessage'.
##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -147,6 +160,13 @@ public void open(int taskNumber, int numTasks) throws
IOException {
if (metricOption != null) {
sinkMetricData = new SinkMetricData(metricOption,
runtimeContext.getMetricGroup());
}
+ if (dirtySink != null) {
+ try {
Review Comment:
Exceptions cannot be ignored during initialization.
##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -196,8 +216,39 @@ private void checkFlushException() {
}
}
+ void handleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) {
+ if (sinkMetricData != null) {
Review Comment:
Dirty metric reports should be location after '!dirtyOptions.ignoreDirty()'.
##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java:
##########
@@ -214,11 +265,8 @@ public final synchronized void writeRecord(In record)
throws IOException {
resetStateAfterFlush();
}
} catch (Exception e) {
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(rowSize, dataSize);
- }
- resetStateAfterFlush();
- throw new IOException("Writing records to JDBC failed.", e);
+ LOG.error(String.format("serialize error, raw data: %s", record),
e);
+ handleDirtyData(record, DirtyType.SERIALIZE_ERROR, e);
Review Comment:
The type of dirty data is right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]