This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 31f2a23f4 [INLONG-6780][Sort] Supports dirty data side-output for JDBC 
connector and related sinks (#6752)
31f2a23f4 is described below

commit 31f2a23f4bf322d6bbf36d463e17a4dc11602410
Author: Yizhou Yang <[email protected]>
AuthorDate: Fri Dec 16 14:35:24 2022 +0800

    [INLONG-6780][Sort] Supports dirty data side-output for JDBC connector and 
related sinks (#6752)
---
 .../jdbc/internal/JdbcBatchingOutputFormat.java    | 79 +++++++++++++++++++---
 .../jdbc/internal/TableJdbcUpsertOutputFormat.java | 27 ++++++--
 .../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 23 ++++++-
 .../sort/jdbc/table/JdbcDynamicTableFactory.java   | 14 +++-
 .../sort/jdbc/table/JdbcDynamicTableSink.java      | 20 +++++-
 .../DorisExtractNodeToMySqlLoadNodeTest.java       | 12 +++-
 .../inlong/sort/parser/MySqlLoadSqlParseTest.java  | 10 ++-
 .../inlong/sort/parser/OracleLoadSqlParseTest.java | 10 ++-
 .../parser/PostgresLoadNodeFlinkSqlParseTest.java  | 10 ++-
 .../sort/parser/SqlServerNodeSqlParseTest.java     | 10 ++-
 10 files changed, 186 insertions(+), 29 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 63c5eaa5b..f8daf1bc7 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
 import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
@@ -37,6 +38,10 @@ import 
org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
@@ -46,6 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
@@ -92,19 +98,26 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
     private Long dataSize = 0L;
     private Long rowSize = 0L;
 
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
+
     public JdbcBatchingOutputFormat(
             @Nonnull JdbcConnectionProvider connectionProvider,
             @Nonnull JdbcExecutionOptions executionOptions,
             @Nonnull StatementExecutorFactory<JdbcExec> 
statementExecutorFactory,
             @Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         super(connectionProvider);
         this.executionOptions = checkNotNull(executionOptions);
         this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
         this.jdbcRecordExtractor = checkNotNull(recordExtractor);
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     public static Builder builder() {
@@ -146,6 +159,13 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
         if (metricOption != null) {
             sinkMetricData = new SinkMetricData(metricOption, 
runtimeContext.getMetricGroup());
         }
+        if (dirtySink != null) {
+            try {
+                dirtySink.open(new Configuration());
+            } catch (Exception e) {
+                throw new IOException("failed to open dirty sink");
+            }
+        }
         jdbcStatementExecutor = 
createAndOpenStatementExecutor(statementExecutorFactory);
         if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
             this.scheduler =
@@ -195,6 +215,38 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
         }
     }
 
+    void handleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) {
+        if (!dirtyOptions.ignoreDirty()) {
+            RuntimeException ex;
+            if (e instanceof RuntimeException) {
+                ex = (RuntimeException) e;
+            } else {
+                ex = new RuntimeException(e);
+            }
+            throw ex;
+        }
+        if (sinkMetricData != null) {
+            sinkMetricData.invokeDirty(rowSize, dataSize);
+        }
+        if (dirtySink != null) {
+            DirtyData.Builder<Object> builder = DirtyData.builder();
+            try {
+                builder.setData(dirtyData)
+                        .setDirtyType(dirtyType)
+                        .setLabels(dirtyOptions.getLabels())
+                        .setLogTag(dirtyOptions.getLogTag())
+                        .setDirtyMessage(e.getMessage())
+                        .setIdentifier(dirtyOptions.getIdentifier());
+                dirtySink.invoke(builder.build());
+            } catch (Exception ex) {
+                if (!dirtyOptions.ignoreSideOutputErrors()) {
+                    throw new RuntimeException(ex);
+                }
+                LOG.warn("Dirty sink failed", ex);
+            }
+        }
+    }
+
     @Override
     public final synchronized void writeRecord(In record) throws IOException {
         checkFlushException();
@@ -213,11 +265,9 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
                 resetStateAfterFlush();
             }
         } catch (Exception e) {
-            if (sinkMetricData != null) {
-                sinkMetricData.invokeDirty(rowSize, dataSize);
-            }
+            LOG.error(String.format("jdbc batch write record error, raw data: 
%s", record), e);
+            handleDirtyData(record, DirtyType.EXTRACT_ROWDATA_ERROR, e);
             resetStateAfterFlush();
-            throw new IOException("Writing records to JDBC failed.", e);
         }
     }
 
@@ -226,8 +276,13 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
         rowSize = 0L;
     }
 
-    protected void addToBatch(In original, JdbcIn extracted) throws 
SQLException {
-        jdbcStatementExecutor.addToBatch(extracted);
+    protected void addToBatch(In original, JdbcIn extracted) {
+        try {
+            jdbcStatementExecutor.addToBatch(extracted);
+        } catch (Exception e) {
+            LOG.error(String.format("DataTypeMappingError, data: %s", 
extracted), e);
+            handleDirtyData(extracted, DirtyType.DATA_TYPE_MAPPING_ERROR, e);
+        }
     }
 
     @Override
@@ -372,6 +427,8 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
         private String auditHostAndPorts;
         private JdbcExecutionOptions.Builder executionOptionsBuilder =
                 JdbcExecutionOptions.builder();
+        private DirtyOptions dirtyOptions;
+        private DirtySink<Object> dirtySink;
 
         /**
          * required, jdbc options.
@@ -468,7 +525,9 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
                         dml,
                         executionOptionsBuilder.build(),
                         inlongMetric,
-                        auditHostAndPorts);
+                        auditHostAndPorts,
+                        dirtyOptions,
+                        dirtySink);
             } else {
                 // warn: don't close over builder fields
                 String sql =
@@ -489,7 +548,9 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
                             return tuple2.f1;
                         },
                         inlongMetric,
-                        auditHostAndPorts);
+                        auditHostAndPorts,
+                        dirtyOptions,
+                        dirtySink);
             }
         }
     }
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
index fa007e72f..b504e2901 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -27,9 +27,13 @@ import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecu
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
 import org.apache.flink.types.Row;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
@@ -58,14 +62,18 @@ class TableJdbcUpsertOutputFormat
             JdbcDmlOptions dmlOptions,
             JdbcExecutionOptions batchOptions,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         this(
                 connectionProvider,
                 batchOptions,
                 ctx -> createUpsertRowExecutor(dmlOptions, ctx),
                 ctx -> createDeleteExecutor(dmlOptions, ctx),
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                dirtyOptions,
+                dirtySink);
     }
 
     @VisibleForTesting
@@ -75,9 +83,11 @@ class TableJdbcUpsertOutputFormat
             StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> 
statementExecutorFactory,
             StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> 
deleteStatementExecutorFactory,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         super(connectionProvider, batchOptions, statementExecutorFactory, 
tuple2 -> tuple2.f1,
-                inlongMetric, auditHostAndPorts);
+                inlongMetric, auditHostAndPorts, dirtyOptions, dirtySink);
         this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
     }
 
@@ -178,11 +188,16 @@ class TableJdbcUpsertOutputFormat
     }
 
     @Override
-    protected void addToBatch(Tuple2<Boolean, Row> original, Row extracted) 
throws SQLException {
+    protected void addToBatch(Tuple2<Boolean, Row> original, Row extracted) {
         if (original.f0) {
             super.addToBatch(original, extracted);
         } else {
-            deleteExecutor.addToBatch(extracted);
+            try {
+                deleteExecutor.addToBatch(extracted);
+            } catch (Exception e) {
+                LOG.error(String.format("DataTypeMappingError, data: %s", 
extracted), e);
+                handleDirtyData(extracted, DirtyType.DATA_TYPE_MAPPING_ERROR, 
e);
+            }
         }
     }
 
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index a45d2704a..4a0bfa228 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -37,6 +37,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.jdbc.internal.JdbcBatchingOutputFormat;
 
 import java.io.Serializable;
@@ -65,6 +67,8 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
     private DataType[] fieldDataTypes;
     private String inlongMetric;
     private String auditHostAndPorts;
+    private DirtyOptions dirtyOptions;
+    private DirtySink<Object> dirtySink;
 
     public JdbcDynamicOutputFormatBuilder() {
 
@@ -91,7 +95,6 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
                 ctx.getExecutionConfig().isObjectReuseEnabled()
                         ? typeSerializer::copy
                         : Function.identity();
-
         return new TableBufferReducedStatementExecutor(
                 createUpsertRowExecutor(
                         dialect,
@@ -240,6 +243,16 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
         return this;
     }
 
+    public JdbcDynamicOutputFormatBuilder setDirtyOptions(DirtyOptions 
dirtyOptions) {
+        this.dirtyOptions = dirtyOptions;
+        return this;
+    }
+
+    public JdbcDynamicOutputFormatBuilder setDirtySink(DirtySink<Object> 
dirtySink) {
+        this.dirtySink = dirtySink;
+        return this;
+    }
+
     public JdbcBatchingOutputFormat<RowData, ?, ?> build() {
         checkNotNull(jdbcOptions, "jdbc options can not be null");
         checkNotNull(dmlOptions, "jdbc dml options can not be null");
@@ -258,7 +271,9 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
                             dmlOptions, ctx, rowDataTypeInformation, 
logicalTypes),
                     JdbcBatchingOutputFormat.RecordExtractor.identity(),
                     inlongMetric,
-                    auditHostAndPorts);
+                    auditHostAndPorts,
+                    dirtyOptions,
+                    dirtySink);
         } else {
             // append only query
             final String sql =
@@ -278,7 +293,9 @@ public class JdbcDynamicOutputFormatBuilder implements 
Serializable {
                             rowDataTypeInformation),
                     JdbcBatchingOutputFormat.RecordExtractor.identity(),
                     inlongMetric,
-                    auditHostAndPorts);
+                    auditHostAndPorts,
+                    dirtyOptions,
+                    dirtySink);
         }
     }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 6756a1d3f..533224f00 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -41,9 +41,14 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 import org.apache.inlong.sort.base.util.JdbcUrlUtils;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
@@ -183,7 +188,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                 FactoryUtil.createTableFactoryHelper(this, context);
         final ReadableConfig config = helper.getOptions();
 
-        helper.validate();
+        helper.validateExcept(DIRTY_PREFIX);
         validateConfigOptions(config);
         JdbcOptions jdbcOptions = getJdbcOptions(config);
         TableSchema physicalSchema =
@@ -191,6 +196,9 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         boolean appendMode = config.get(SINK_APPEND_MODE);
         String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String auditHostAndPorts = 
config.getOptional(INLONG_AUDIT).orElse(null);
+        // Build the dirty data side-output
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(helper.getOptions());
+        final DirtySink<Object> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
         return new JdbcDynamicTableSink(
                 jdbcOptions,
                 getJdbcExecutionOptions(config),
@@ -198,7 +206,9 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                 physicalSchema,
                 appendMode,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                dirtyOptions,
+                dirtySink);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
index 975bbd69c..e214e61d1 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -28,8 +28,11 @@ import 
org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.jdbc.internal.GenericJdbcSinkFunction;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -53,6 +56,9 @@ public class JdbcDynamicTableSink implements DynamicTableSink 
{
     private final String auditHostAndPorts;
     private final boolean appendMode;
 
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
+
     public JdbcDynamicTableSink(
             JdbcOptions jdbcOptions,
             JdbcExecutionOptions executionOptions,
@@ -60,7 +66,9 @@ public class JdbcDynamicTableSink implements DynamicTableSink 
{
             TableSchema tableSchema,
             boolean appendMode,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         this.jdbcOptions = jdbcOptions;
         this.executionOptions = executionOptions;
         this.dmlOptions = dmlOptions;
@@ -69,6 +77,8 @@ public class JdbcDynamicTableSink implements DynamicTableSink 
{
         this.appendMode = appendMode;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     @Override
@@ -102,6 +112,8 @@ public class JdbcDynamicTableSink implements 
DynamicTableSink {
         builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
         builder.setInLongMetric(inlongMetric);
         builder.setAuditHostAndPorts(auditHostAndPorts);
+        builder.setDirtyOptions(dirtyOptions);
+        builder.setDirtySink(dirtySink);
         return SinkFunctionProvider.of(
                 new GenericJdbcSinkFunction<>(builder.build()), 
jdbcOptions.getParallelism());
     }
@@ -109,7 +121,7 @@ public class JdbcDynamicTableSink implements 
DynamicTableSink {
     @Override
     public DynamicTableSink copy() {
         return new JdbcDynamicTableSink(jdbcOptions, executionOptions, 
dmlOptions,
-                tableSchema, appendMode, inlongMetric, auditHostAndPorts);
+                tableSchema, appendMode, inlongMetric, auditHostAndPorts, 
dirtyOptions, dirtySink);
     }
 
     @Override
@@ -132,7 +144,9 @@ public class JdbcDynamicTableSink implements 
DynamicTableSink {
                 && Objects.equals(tableSchema, that.tableSchema)
                 && Objects.equals(dialectName, that.dialectName)
                 && Objects.equals(inlongMetric, that.inlongMetric)
-                && Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
+                && Objects.equals(auditHostAndPorts, that.auditHostAndPorts)
+                && Objects.equals(dirtyOptions, that.dirtyOptions)
+                && Objects.equals(dirtySink, that.dirtySink);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
index 6ff6d5a25..e556562b0 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
@@ -43,7 +43,9 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -89,9 +91,15 @@ public class DorisExtractNodeToMySqlLoadNodeTest extends 
AbstractTestBase {
                                 new FieldInfo("sale", new 
DoubleFormatInfo())));
 
         List<FilterFunction> filters = new ArrayList<>();
-
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_sqlserver");
         return new MySqlLoadNode("2", "mysql_output", fields, fieldRelations, 
filters,
-                null, null, null, "jdbc:mysql://localhost:3306/inlong",
+                null, null, properties, "jdbc:mysql://localhost:3306/inlong",
                 "inlong", "inlong", "table_output", null);
     }
 
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
index 2d623a9d3..31d7d4aa6 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
@@ -40,6 +40,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -72,8 +73,15 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase {
                                 new FieldInfo("name", new StringFormatInfo())),
                         new FieldRelation(new FieldInfo("age", new 
IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())));
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_sqlserver");
         return new MySqlLoadNode("2", "mysql_output", fields, relations, null,
-                null, null, null, "jdbc:mysql://localhost:3306/inlong",
+                null, null, properties, "jdbc:mysql://localhost:3306/inlong",
                 "inlong", "inlong", "table_output", "id");
     }
 
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
index 9fcbbb174..728d71fac 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
@@ -40,6 +40,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -72,8 +73,15 @@ public class OracleLoadSqlParseTest extends AbstractTestBase 
{
                                 new FieldInfo("NAME", new StringFormatInfo())),
                         new FieldRelation(new FieldInfo("age", new 
IntFormatInfo()),
                                 new FieldInfo("AGE", new IntFormatInfo())));
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=test&table=test2");
         return new OracleLoadNode("2", "oracle_output", fields, relations, 
null,
-                null, null, null, "jdbc:oracle:thin:@localhost:1521:xe",
+                null, null, properties, "jdbc:oracle:thin:@localhost:1521:xe",
                 "flinkuser", "flinkpw", "student", "ID");
     }
 
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
index f5bf995d9..ccc43ea7d 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
@@ -39,6 +39,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -70,13 +71,20 @@ public class PostgresLoadNodeFlinkSqlParseTest extends 
AbstractTestBase {
      * @return postgres load node
      */
     private PostgresLoadNode buildPostgresLoadNode() {
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=test&table=test2");
         return new PostgresLoadNode("2", "postgres_output", Arrays.asList(new 
FieldInfo("name",
                 new StringFormatInfo()), new FieldInfo("age", new 
IntFormatInfo())),
                 Arrays.asList(new FieldRelation(new FieldInfo("name", new 
StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo())),
                         new FieldRelation(new FieldInfo("age", new 
IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo()))),
-                null, null, 1, null,
+                null, null, 1, properties,
                 "jdbc:postgresql://localhost:5432/postgres",
                 "postgres",
                 "inlong",
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
index 1ae060cdf..61c82f23e 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -130,8 +131,15 @@ public class SqlServerNodeSqlParseTest extends 
AbstractTestBase {
                         new FieldInfo("id", new LongFormatInfo())),
                         new FieldRelation(new FieldInfo("name", new 
StringFormatInfo()),
                                 new FieldInfo("name", new 
StringFormatInfo())));
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_sqlserver");
         return new SqlServerLoadNode(id, "sqlserver_out", fields, relations, 
null, null, 1,
-                null, 
"jdbc:sqlserver://localhost:1433;databaseName=column_type_test", "SA",
+                properties, 
"jdbc:sqlserver://localhost:1433;databaseName=column_type_test", "SA",
                 "INLONG*123", "dbo", "work1", "id");
     }
 

Reply via email to