This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 10a153b22 [INLONG-7249][Sort] JDBC accurate dirty data archive and 
metric calculation (#7580)
10a153b22 is described below

commit 10a153b228d9615958066d901191f2c1448e1f8d
Author: Yizhou Yang <[email protected]>
AuthorDate: Tue Apr 11 17:02:50 2023 +0800

    [INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation 
(#7580)
    
    Co-authored-by: Yizhou Yang <[email protected]>
---
 .../sort/base/dirty/sink/log/LogDirtySink.java     |   2 +
 .../inlong/sort/base/dirty/RegexReplaceTest.java   |  17 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  66 +++++-
 .../internal/JdbcMultiBatchingOutputFormat.java    | 195 +++++++++++++++-
 .../internal/TableMetricStatementExecutor.java     | 249 +++++++++++++++++++++
 5 files changed, 506 insertions(+), 23 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
index 2884ac398..942e83da1 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -89,6 +89,8 @@ public class LogDirtySink<T> implements DirtySink<T> {
                 RowData.FieldGetter[] getters = fieldGetters;
                 if (rowType != null) {
                     getters = FormatUtils.parseFieldGetters(rowType);
+                } else {
+                    return data.toString();
                 }
                 value = FormatUtils.csvFormat(data, getters, labels, 
fieldDelimiter);
                 break;
diff --git 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
index 2554d0202..bc1061c81 100644
--- 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
+++ 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
@@ -21,19 +21,16 @@ import lombok.extern.slf4j.Slf4j;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-
 @Slf4j
 public class RegexReplaceTest {
 
     @Test
-    public void testRegexReplacement() throws IOException {
-        String[] identifier = new String[2];
-        identifier[0] = "yizhouyang";
-        identifier[1] = "table2";
-        String pattern = "${database}-${table}-${DIRTY_MESSAGE}";
-        String answer = DirtySinkHelper.regexReplace(pattern, 
DirtyType.BATCH_LOAD_ERROR, "mock message", identifier[0],
-                identifier[1], null);
-        Assert.assertEquals("yizhouyang-table2-mock message", answer);
+    public void testRegexReplacement() {
+        String database = "database1";
+        String table = "table2";
+        String pattern = "${source.table}-${source.database}-${DIRTY_MESSAGE}";
+        String answer = DirtySinkHelper.regexReplace(pattern, 
DirtyType.BATCH_LOAD_ERROR, "mock message", database,
+                table, null);
+        Assert.assertEquals("table2-database1-mock message", answer);
     }
 }
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 07ea97e70..f7c2f74e5 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -28,10 +28,15 @@ import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
 import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
 import 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.connector.jdbc.statement.StatementFactory;
 import org.apache.flink.connector.jdbc.utils.JdbcUtils;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -42,6 +47,7 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.inlong.sort.base.dirty.DirtyData;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
@@ -56,6 +62,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.util.HashMap;
@@ -184,7 +191,8 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
                                     if (!closed) {
                                         try {
                                             flush();
-                                            if (sinkMetricData != null) {
+                                            // report is only needed when 
TableMetricExecutor is not initialized
+                                            if (sinkMetricData != null && 
dirtySink == null) {
                                                 sinkMetricData.invoke(rowSize, 
dataSize);
                                             }
                                             resetStateAfterFlush();
@@ -204,6 +212,16 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
     private JdbcExec createAndOpenStatementExecutor(
             StatementExecutorFactory<JdbcExec> statementExecutorFactory) 
throws IOException {
         JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
+        if (dirtySink != null) {
+            try {
+                JdbcExec newExecutor = enhanceExecutor(exec);
+                if (newExecutor != null) {
+                    exec = newExecutor;
+                }
+            } catch (Exception e) {
+                LOG.error("tableStatementExecutor enhance failed", e);
+            }
+        }
         try {
             exec.prepareStatements(connectionProvider.getConnection());
         } catch (SQLException e) {
@@ -277,7 +295,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData != null) {
+                if (sinkMetricData != null && dirtySink == null) {
                     sinkMetricData.invoke(rowSize, dataSize);
                 }
                 resetStateAfterFlush();
@@ -303,7 +321,6 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
         try {
             jdbcStatementExecutor.addToBatch(extracted);
         } catch (Exception e) {
-            LOG.error(String.format("DataTypeMappingError, data: %s", 
extracted), e);
             handleDirtyData(extracted, DirtyType.DATA_TYPE_MAPPING_ERROR, e);
         }
     }
@@ -365,6 +382,49 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
         }
     }
 
+    /**
+     *  Use reflection to initialize TableMetricStatementExecutor, and replace 
the original executor
+     *  or upsertExecutor to calculate metrics.
+     */
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws 
NoSuchFieldException, IllegalAccessException {
+        if (dirtySink == null) {
+            return null;
+        }
+        final DirtySinkHelper dirtySinkHelper = new 
DirtySinkHelper<>(dirtyOptions, dirtySink);
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field executorType;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            executorType = 
TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            executorType = 
TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " 
+ exec.getClass());
+        }
+        executorType.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) 
executorType.get(exec);
+        // get the factory and rowconverter to initialize 
TableMetricStatementExecutor.
+        Field statementFactory = 
TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = 
TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) 
statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) 
rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, 
dirtySinkHelper, sinkMetricData);
+        // for TableBufferedStatementExecutor, replace the executor
+        if (exec instanceof TableBufferedStatementExecutor) {
+            Field transform = 
TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            transform.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, 
RowData>) transform.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        // replace the sub-executor that generates flinkSQL for executors such 
as
+        // TableBufferReducedExecutor or InsertOrUpdateExecutor
+        executorType.set(exec, newExecutor);
+        return null;
+    }
+
     protected void attemptFlush() throws SQLException {
         jdbcStatementExecutor.executeBatch();
     }
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index e812eeeff..96ab15ed6 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -28,9 +28,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor;
+import 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.statement.StatementFactory;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -43,7 +48,9 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.apache.inlong.sort.base.metric.MetricOption;
@@ -58,6 +65,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.sql.Timestamp;
@@ -108,6 +116,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
     private transient ScheduledExecutorService scheduler;
     private transient ScheduledFuture<?> scheduledFuture;
     private transient RuntimeContext runtimeContext;
+    private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
     private JdbcDmlOptions dmlOptions;
     private JdbcOptions jdbcOptions;
     private boolean appendMode;
@@ -118,7 +127,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
     private transient Map<String, List<GenericRowData>> recordsMap = new 
HashMap<>();
     private transient Map<String, Exception> tableExceptionMap = new 
HashMap<>();
     private transient Boolean stopWritingWhenTableException;
-
     private transient ListState<MetricState> metricStateListState;
     private final String sinkMultipleFormat;
     private final String databasePattern;
@@ -256,7 +264,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
         if (null != pkNameMap.get(tableIdentifier)) {
             pkNameList = pkNameMap.get(tableIdentifier);
         }
-        StatementExecutorFactory<JdbcExec> statementExecutorFactory = null;
+        StatementExecutorFactory<JdbcExec> statementExecutorFactory;
         if (CollectionUtils.isNotEmpty(pkNameList) && !appendMode) {
             // upsert query
             JdbcDmlOptions createDmlOptions = JdbcDmlOptions.builder()
@@ -294,6 +302,19 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                 return null;
             }
             connectionExecProviderMap.put(tableIdentifier, 
tableConnectionProvider);
+
+            if (!stopWritingWhenTableException && 
dirtySinkHelper.getDirtySink() != null) {
+                try {
+                    JdbcExec newExecutor = enhanceExecutor(jdbcExec);
+                    if (newExecutor != null) {
+                        jdbcExec = newExecutor;
+                    }
+                } catch (Exception e) {
+                    LOG.warn("enhance executor failed for class :" +
+                            jdbcExec.getClass(), e);
+                }
+            }
+
             
jdbcExec.prepareStatements(tableConnectionProvider.getConnection());
         } catch (Exception e) {
             return null;
@@ -302,6 +323,49 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
         return jdbcExec;
     }
 
+    /**
+     *  Use reflection to initialize TableMetricStatementExecutor, and replace 
the original executor
+     *  or upsertExecutor to calculate metrics.
+     */
+    private JdbcExec enhanceExecutor(JdbcExec exec) throws 
NoSuchFieldException, IllegalAccessException {
+        if (dirtySinkHelper.getDirtySink() == null) {
+            return null;
+        }
+        // enhance the actual executor to tablemetricstatementexecutor
+        Field subExecutor;
+        if (exec instanceof TableBufferReducedStatementExecutor) {
+            subExecutor = 
TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+        } else if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = 
TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+        } else {
+            throw new RuntimeException("table enhance failed, can't enhance " 
+ exec.getClass());
+        }
+        subExecutor.setAccessible(true);
+        TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor) 
subExecutor.get(exec);
+        // get the stamentfactory and rowconverter in order to initialize 
tablemetricstatementExecutor
+        Field statementFactory = 
TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+        Field rowConverter = 
TableSimpleStatementExecutor.class.getDeclaredField("converter");
+        statementFactory.setAccessible(true);
+        rowConverter.setAccessible(true);
+        final StatementFactory stmtFactory = (StatementFactory) 
statementFactory.get(executor);
+        final JdbcRowConverter converter = (JdbcRowConverter) 
rowConverter.get(executor);
+        TableMetricStatementExecutor newExecutor =
+                new TableMetricStatementExecutor(stmtFactory, converter, 
dirtySinkHelper, sinkMetricData);
+        newExecutor.setMultipleSink(true);
+        // replace the original TableBufferedStatementExecutor with metric 
executor
+        if (exec instanceof TableBufferedStatementExecutor) {
+            subExecutor = 
TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+            subExecutor.setAccessible(true);
+            Function<RowData, RowData> valueTransform = (Function<RowData, 
RowData>) subExecutor.get(exec);
+            newExecutor.setValueTransform(valueTransform);
+            return (JdbcExec) newExecutor;
+        }
+        // replace the sub-executor that generates flinkSQL for executors such 
as
+        // TableBufferReducedExecutor or InsertOrUpdateExecutor
+        subExecutor.set(exec, newExecutor);
+        return null;
+    }
+
     public void getAndSetPkNamesFromDb(String tableIdentifier) {
         try {
             AbstractJdbcDialect jdbcDialect = (AbstractJdbcDialect) 
jdbcOptions.getDialect();
@@ -332,12 +396,12 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
     @Override
     public final synchronized void writeRecord(In row) throws IOException {
         checkFlushException();
-        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
-                (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+
         if (row instanceof RowData) {
             RowData rowData = (RowData) row;
             JsonNode rootNode = 
jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
-            String tableIdentifier = null;
+            String tableIdentifier;
             try {
                 if (StringUtils.isBlank(schemaPattern)) {
                     tableIdentifier = StringUtils.join(
@@ -354,7 +418,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                 return;
             }
 
-            GenericRowData record = null;
+            GenericRowData record;
             try {
                 RowType rowType = 
jsonDynamicSchemaFormat.extractSchema(rootNode);
                 if (rowType != null) {
@@ -392,6 +456,58 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
         }
     }
 
+    private void fillDirtyData(JdbcExec exec, String tableIdentifier) {
+        String[] identifiers = tableIdentifier.split("\\.");
+        String database;
+        String table;
+        String schema = null;
+        if (identifiers.length == 2) {
+            database = identifiers[0];
+            table = identifiers[1];
+        } else {
+            database = identifiers[0];
+            schema = identifiers[1];
+            table = identifiers[2];
+        }
+        TableMetricStatementExecutor executor = null;
+        try {
+            Field subExecutor;
+            if (exec instanceof TableMetricStatementExecutor) {
+                executor = (TableMetricStatementExecutor) exec;
+            } else if (exec instanceof TableBufferReducedStatementExecutor) {
+                subExecutor = 
TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+                subExecutor.setAccessible(true);
+                executor = (TableMetricStatementExecutor) 
subExecutor.get(exec);
+            } else if (exec instanceof TableBufferedStatementExecutor) {
+                subExecutor = 
TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+                subExecutor.setAccessible(true);
+                executor = (TableMetricStatementExecutor) 
subExecutor.get(exec);
+            }
+        } catch (Exception e) {
+            LOG.error("parse executor failed", e);
+        }
+
+        try {
+            DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+            String dirtyLabel = 
DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), 
DirtyType.BATCH_LOAD_ERROR, null,
+                    database, table, schema);
+            String dirtyLogTag =
+                    DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), 
DirtyType.BATCH_LOAD_ERROR, null,
+                            database, table, schema);
+            String dirtyIdentifier =
+                    DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), 
DirtyType.BATCH_LOAD_ERROR,
+                            null, database, table, schema);
+
+            if (executor != null) {
+                executor.setDirtyMetaData(dirtyLabel, dirtyLogTag, 
dirtyIdentifier);
+            } else {
+                LOG.error("executor is null, can not set metaData");
+            }
+        } catch (Exception e) {
+            LOG.error("filling dirty metadata failed", e);
+        }
+    }
+
     /**
      * Convert fieldMap(data) to GenericRowData with rowType(schema)
      */
@@ -510,20 +626,33 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
             if (CollectionUtils.isEmpty(tableIdRecordList)) {
                 continue;
             }
-            JdbcExec jdbcStatementExecutor = null;
+            JdbcExec jdbcStatementExecutor;
             Boolean flushFlag = false;
             Exception tableException = null;
             try {
+                getAndSetPkNamesFromDb(tableIdentifier);
                 jdbcStatementExecutor = 
getOrCreateStatementExecutor(tableIdentifier);
                 Long totalDataSize = 0L;
                 for (GenericRowData record : tableIdRecordList) {
                     totalDataSize = totalDataSize + 
record.toString().getBytes(StandardCharsets.UTF_8).length;
                     jdbcStatementExecutor.addToBatch((JdbcIn) record);
                 }
+                if (dirtySinkHelper.getDirtySink() != null) {
+                    fillDirtyData(jdbcStatementExecutor, tableIdentifier);
+                }
                 jdbcStatementExecutor.executeBatch();
                 flushFlag = true;
-                outputMetrics(tableIdentifier, 
Long.valueOf(tableIdRecordList.size()),
-                        totalDataSize, false);
+                if (dirtySinkHelper.getDirtySink() == null) {
+                    outputMetrics(tableIdentifier, 
Long.valueOf(tableIdRecordList.size()),
+                            totalDataSize, false);
+                } else {
+                    try {
+                        outputMetrics(tableIdentifier);
+                    } catch (Exception e) {
+                        outputMetrics(tableIdentifier, 
Long.valueOf(tableIdRecordList.size()),
+                                totalDataSize, false);
+                    }
+                }
             } catch (Exception e) {
                 tableException = e;
                 LOG.warn("Flush all data for tableIdentifier:{} get err:", 
tableIdentifier, e);
@@ -547,7 +676,18 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                             jdbcStatementExecutor.executeBatch();
                             Long totalDataSize =
                                     
Long.valueOf(record.toString().getBytes(StandardCharsets.UTF_8).length);
-                            outputMetrics(tableIdentifier, 1L, totalDataSize, 
false);
+                            if (dirtySinkHelper.getDirtySink() == null) {
+                                outputMetrics(tableIdentifier, (long) 
tableIdRecordList.size(),
+                                        totalDataSize, false);
+                            } else {
+                                try {
+                                    outputMetrics(tableIdentifier);
+                                } catch (Exception e) {
+                                    LOG.error("JDBC table metric calculation 
exception", e);
+                                    outputMetrics(tableIdentifier, (long) 
tableIdRecordList.size(),
+                                            totalDataSize, false);
+                                }
+                            }
                             flushFlag = true;
                             break;
                         } catch (Exception e) {
@@ -611,6 +751,41 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
         }
     }
 
+    private void outputMetrics(String tableIdentifier) throws 
NoSuchFieldException, IllegalAccessException {
+        String[] fieldArray = tableIdentifier.split("\\.");
+        // throw an exception if the executor is not enhanced
+        JdbcExec executor = jdbcExecMap.get(tableIdentifier);
+        Field subExecutor;
+        if (executor instanceof TableBufferReducedStatementExecutor) {
+            subExecutor = 
TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+            subExecutor.setAccessible(true);
+            executor = (JdbcExec) subExecutor.get(executor);
+        } else if (executor instanceof TableBufferedStatementExecutor) {
+            subExecutor = 
TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+            subExecutor.setAccessible(true);
+            executor = (JdbcExec) subExecutor.get(executor);
+        }
+        Field metricField = 
TableMetricStatementExecutor.class.getDeclaredField("metric");
+        long[] metrics = (long[]) metricField.get(executor);
+        long cleanCount = metrics[0];
+        long cleanSize = metrics[1];
+        long dirtyCount = metrics[2];
+        long dirtySize = metrics[3];
+
+        if (fieldArray.length == 3) {
+            sinkMetricData.outputDirtyMetrics(fieldArray[0], fieldArray[1], 
fieldArray[2],
+                    dirtyCount, dirtySize);
+            sinkMetricData.outputMetrics(fieldArray[0], fieldArray[1], 
fieldArray[2],
+                    cleanCount, cleanSize);
+        } else if (fieldArray.length == 2) {
+            sinkMetricData.outputDirtyMetrics(fieldArray[0], null, 
fieldArray[1],
+                    dirtyCount, dirtySize);
+            sinkMetricData.outputMetrics(fieldArray[0], null, fieldArray[1],
+                    cleanCount, cleanSize);
+        }
+        metricField.set(executor, new long[4]);
+    }
+
     /**
      * Executes prepared statement and closes all resources of this instance.
      */
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
new file mode 100644
index 000000000..c5ae7cc65
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.connector.jdbc.statement.StatementFactory;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link JdbcBatchStatementExecutor} that simply adds the records into 
batches of {@link
+ * java.sql.PreparedStatement} and doesn't buffer records in memory. Only used 
in Table/SQL API.
+ * Supported executors:TableBufferedStatementExecutor, 
TableBufferReducedStatementExecutor, TableSimpleStatementExecutor
+ */
+public final class TableMetricStatementExecutor implements 
JdbcBatchStatementExecutor<RowData> {
+
+    private static final Pattern pattern = Pattern.compile("Batch entry 
(\\d+)");
+    private static final Logger LOG = 
LoggerFactory.getLogger(TableMetricStatementExecutor.class);
+    private final StatementFactory stmtFactory;
+    private final JdbcRowConverter converter;
+    private List<RowData> batch;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
+    private final SinkMetricData sinkMetricData;
+    private final AtomicInteger counter = new AtomicInteger();
+    private transient FieldNamedPreparedStatement st;
+    private boolean multipleSink;
+    private String label;
+    private String logtag;
+    private String identifier;
+    private Function<RowData, RowData> valueTransform = null;
+    // counters used for table level metric calculation for multiple sink
+    public long[] metric = new long[4];
+
+    public TableMetricStatementExecutor(StatementFactory stmtFactory, 
JdbcRowConverter converter,
+            DirtySinkHelper<Object> dirtySinkHelper, SinkMetricData 
sinkMetricData) {
+        this.stmtFactory = checkNotNull(stmtFactory);
+        this.converter = checkNotNull(converter);
+        this.batch = new CopyOnWriteArrayList<>();
+        this.dirtySinkHelper = dirtySinkHelper;
+        this.sinkMetricData = sinkMetricData;
+    }
+
+    public void setDirtyMetaData(String label, String logtag, String 
identifier) {
+        this.label = label;
+        this.logtag = logtag;
+        this.identifier = identifier;
+    }
+
+    public void setMultipleSink(boolean multipleSink) {
+        this.multipleSink = multipleSink;
+    }
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        st = stmtFactory.createStatement(connection);
+    }
+
+    public void setValueTransform(Function<RowData, RowData> valueTransform) {
+        this.valueTransform = valueTransform;
+    }
+
+    @Override
+    public void addToBatch(RowData record) throws SQLException {
+        if (valueTransform != null) {
+            record = valueTransform.apply(record); // copy or not
+        }
+        batch.add(record);
+        converter.toExternal(record, st);
+        st.addBatch();
+    }
+
+    @Override
+    public void executeBatch() throws SQLException {
+        try {
+            st.executeBatch();
+
+            long writtenSize = batch.size();
+            // approximate since it may be inefficient to iterate over all 
writtenSize-1 elements.
+            long writtenBytes = 0L;
+            if (writtenSize > 0) {
+                writtenBytes = (long) 
batch.get(0).toString().getBytes(StandardCharsets.UTF_8).length * writtenSize;
+            }
+            batch.clear();
+            if (!multipleSink) {
+                sinkMetricData.invoke(writtenSize, writtenBytes);
+            } else {
+                metric[0] += writtenSize;
+                metric[1] += writtenBytes;
+            }
+
+        } catch (SQLException e) {
+            // clear the prepared statement first to avoid exceptions
+            st.clearParameters();
+            try {
+                processErrorPosition(e);
+            } catch (Exception ex) {
+                try {
+                    retryEntireBatch();
+                } catch (JsonProcessingException exc) {
+                    LOG.error("dirty data archive failed");
+                }
+            }
+        }
+    }
+
+    private void processErrorPosition(SQLException e) throws SQLException {
+        List<Integer> errorPositions = parseError(e);
+        // the data before the first sqlexception are already written, handle 
those and remove them.
+        int writtenSize = errorPositions.get(0);
+        long writtenBytes = 0L;
+        if (writtenSize > 0) {
+            writtenBytes = (long) 
batch.get(0).toString().getBytes(StandardCharsets.UTF_8).length * writtenSize;
+        }
+        if (!multipleSink) {
+            sinkMetricData.invoke(writtenSize, writtenBytes);
+        } else {
+            metric[0] += writtenSize;
+            metric[1] += writtenBytes;
+        }
+
+        batch = batch.subList(writtenSize, batch.size());
+
+        // for the unwritten data, remove the dirty ones
+        for (int pos : errorPositions) {
+            pos -= writtenSize;
+            RowData record = batch.get(pos);
+            batch.remove(record);
+            invokeDirty(record, e);
+        }
+
+        // try to execute the supposedly clean batch, throw exception on 
failure
+        for (RowData record : batch) {
+            addToBatch(record);
+        }
+        st.executeBatch();
+        batch.clear();
+        st.clearParameters();
+    }
+
+    private void retryEntireBatch() throws SQLException, 
JsonProcessingException {
+        // clear parameters to make sure the batch is always clean in the end.
+        st.clearParameters();
+        for (RowData rowData : batch) {
+            try {
+                converter.toExternal(rowData, st);
+                st.addBatch();
+                st.executeBatch();
+                if (!multipleSink) {
+                    sinkMetricData.invoke(1, 
rowData.toString().getBytes().length);
+                } else {
+                    metric[0] += 1;
+                    metric[1] += rowData.toString().getBytes().length;
+                }
+            } catch (Exception e) {
+                st.clearParameters();
+                invokeDirty(rowData, e);
+            }
+        }
+        batch.clear();
+        st.clearParameters();
+    }
+
+    private void invokeDirty(RowData rowData, Exception e) {
+        if (!multipleSink) {
+            if (dirtySinkHelper != null) {
+                dirtySinkHelper.invoke(rowData.toString(), 
DirtyType.BATCH_LOAD_ERROR, e);
+            }
+            sinkMetricData.invokeDirty(1, 
rowData.toString().getBytes().length);
+        } else {
+            if (dirtySinkHelper != null) {
+                dirtySinkHelper.invoke(rowData.toString(), 
DirtyType.BATCH_LOAD_ERROR, label, logtag, identifier, e);
+            }
+            metric[2] += 1;
+            metric[3] += rowData.toString().getBytes().length;
+        }
+    }
+
+    private List<Integer> parseError(SQLException e) throws SQLException {
+        List<Integer> errors = new ArrayList<>();
+        int pos = getPosFromMessage(e.getMessage());
+        if (pos != -1) {
+            errors.add(getPosFromMessage(e.getMessage()));
+        } else {
+            throw new SQLException(e);
+        }
+        SQLException next = e.getNextException();
+        if (next != null) {
+            errors.addAll(parseError(next));
+        }
+        return errors;
+    }
+
+    private int getPosFromMessage(String message) {
+        Matcher matcher = pattern.matcher(message);
+        if (matcher.find()) {
+            int pos = Integer.parseInt(matcher.group(1));
+            // duplicate key is a special case,can't just return the first 
instance
+            if (message.contains("duplicate key")) {
+                return -1;
+            }
+            return pos;
+        }
+        LOG.error("The dirty message {} can't be parsed", message);
+        return -1;
+    }
+
+    @Override
+    public void closeStatements() throws SQLException {
+        if (st != null) {
+            st.close();
+            st = null;
+        }
+    }
+}


Reply via email to