This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 31f2a23f4 [INLONG-6780][Sort] Supports dirty data side-output for JDBC
connector and related sinks (#6752)
31f2a23f4 is described below
commit 31f2a23f4bf322d6bbf36d463e17a4dc11602410
Author: Yizhou Yang <[email protected]>
AuthorDate: Fri Dec 16 14:35:24 2022 +0800
[INLONG-6780][Sort] Supports dirty data side-output for JDBC connector and
related sinks (#6752)
---
.../jdbc/internal/JdbcBatchingOutputFormat.java | 79 +++++++++++++++++++---
.../jdbc/internal/TableJdbcUpsertOutputFormat.java | 27 ++++++--
.../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 23 ++++++-
.../sort/jdbc/table/JdbcDynamicTableFactory.java | 14 +++-
.../sort/jdbc/table/JdbcDynamicTableSink.java | 20 +++++-
.../DorisExtractNodeToMySqlLoadNodeTest.java | 12 +++-
.../inlong/sort/parser/MySqlLoadSqlParseTest.java | 10 ++-
.../inlong/sort/parser/OracleLoadSqlParseTest.java | 10 ++-
.../parser/PostgresLoadNodeFlinkSqlParseTest.java | 10 ++-
.../sort/parser/SqlServerNodeSqlParseTest.java | 10 ++-
10 files changed, 186 insertions(+), 29 deletions(-)
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 63c5eaa5b..f8daf1bc7 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
@@ -37,6 +38,10 @@ import
org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
@@ -46,6 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
@@ -92,19 +98,26 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
private Long dataSize = 0L;
private Long rowSize = 0L;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
+
public JdbcBatchingOutputFormat(
@Nonnull JdbcConnectionProvider connectionProvider,
@Nonnull JdbcExecutionOptions executionOptions,
@Nonnull StatementExecutorFactory<JdbcExec>
statementExecutorFactory,
@Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
super(connectionProvider);
this.executionOptions = checkNotNull(executionOptions);
this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
this.jdbcRecordExtractor = checkNotNull(recordExtractor);
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
public static Builder builder() {
@@ -146,6 +159,13 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
if (metricOption != null) {
sinkMetricData = new SinkMetricData(metricOption,
runtimeContext.getMetricGroup());
}
+ if (dirtySink != null) {
+ try {
+ dirtySink.open(new Configuration());
+ } catch (Exception e) {
+ throw new IOException("failed to open dirty sink");
+ }
+ }
jdbcStatementExecutor =
createAndOpenStatementExecutor(statementExecutorFactory);
if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
this.scheduler =
@@ -195,6 +215,38 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
}
}
+ void handleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) {
+ if (!dirtyOptions.ignoreDirty()) {
+ RuntimeException ex;
+ if (e instanceof RuntimeException) {
+ ex = (RuntimeException) e;
+ } else {
+ ex = new RuntimeException(e);
+ }
+ throw ex;
+ }
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(rowSize, dataSize);
+ }
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(dirtyData)
+ .setDirtyType(dirtyType)
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setDirtyMessage(e.getMessage())
+ .setIdentifier(dirtyOptions.getIdentifier());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOG.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+
@Override
public final synchronized void writeRecord(In record) throws IOException {
checkFlushException();
@@ -213,11 +265,9 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
resetStateAfterFlush();
}
} catch (Exception e) {
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(rowSize, dataSize);
- }
+ LOG.error(String.format("jdbc batch write record error, raw data:
%s", record), e);
+ handleDirtyData(record, DirtyType.EXTRACT_ROWDATA_ERROR, e);
resetStateAfterFlush();
- throw new IOException("Writing records to JDBC failed.", e);
}
}
@@ -226,8 +276,13 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
rowSize = 0L;
}
- protected void addToBatch(In original, JdbcIn extracted) throws
SQLException {
- jdbcStatementExecutor.addToBatch(extracted);
+ protected void addToBatch(In original, JdbcIn extracted) {
+ try {
+ jdbcStatementExecutor.addToBatch(extracted);
+ } catch (Exception e) {
+ LOG.error(String.format("DataTypeMappingError, data: %s",
extracted), e);
+ handleDirtyData(extracted, DirtyType.DATA_TYPE_MAPPING_ERROR, e);
+ }
}
@Override
@@ -372,6 +427,8 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
private String auditHostAndPorts;
private JdbcExecutionOptions.Builder executionOptionsBuilder =
JdbcExecutionOptions.builder();
+ private DirtyOptions dirtyOptions;
+ private DirtySink<Object> dirtySink;
/**
* required, jdbc options.
@@ -468,7 +525,9 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
dml,
executionOptionsBuilder.build(),
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtyOptions,
+ dirtySink);
} else {
// warn: don't close over builder fields
String sql =
@@ -489,7 +548,9 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
return tuple2.f1;
},
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtyOptions,
+ dirtySink);
}
}
}
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
index fa007e72f..b504e2901 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -27,9 +27,13 @@ import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecu
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.types.Row;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
@@ -58,14 +62,18 @@ class TableJdbcUpsertOutputFormat
JdbcDmlOptions dmlOptions,
JdbcExecutionOptions batchOptions,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
this(
connectionProvider,
batchOptions,
ctx -> createUpsertRowExecutor(dmlOptions, ctx),
ctx -> createDeleteExecutor(dmlOptions, ctx),
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtyOptions,
+ dirtySink);
}
@VisibleForTesting
@@ -75,9 +83,11 @@ class TableJdbcUpsertOutputFormat
StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
statementExecutorFactory,
StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
deleteStatementExecutorFactory,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
super(connectionProvider, batchOptions, statementExecutorFactory,
tuple2 -> tuple2.f1,
- inlongMetric, auditHostAndPorts);
+ inlongMetric, auditHostAndPorts, dirtyOptions, dirtySink);
this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
}
@@ -178,11 +188,16 @@ class TableJdbcUpsertOutputFormat
}
@Override
- protected void addToBatch(Tuple2<Boolean, Row> original, Row extracted)
throws SQLException {
+ protected void addToBatch(Tuple2<Boolean, Row> original, Row extracted) {
if (original.f0) {
super.addToBatch(original, extracted);
} else {
- deleteExecutor.addToBatch(extracted);
+ try {
+ deleteExecutor.addToBatch(extracted);
+ } catch (Exception e) {
+ LOG.error(String.format("DataTypeMappingError, data: %s",
extracted), e);
+ handleDirtyData(extracted, DirtyType.DATA_TYPE_MAPPING_ERROR,
e);
+ }
}
}
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index a45d2704a..4a0bfa228 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -37,6 +37,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.jdbc.internal.JdbcBatchingOutputFormat;
import java.io.Serializable;
@@ -65,6 +67,8 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
private DataType[] fieldDataTypes;
private String inlongMetric;
private String auditHostAndPorts;
+ private DirtyOptions dirtyOptions;
+ private DirtySink<Object> dirtySink;
public JdbcDynamicOutputFormatBuilder() {
@@ -91,7 +95,6 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
ctx.getExecutionConfig().isObjectReuseEnabled()
? typeSerializer::copy
: Function.identity();
-
return new TableBufferReducedStatementExecutor(
createUpsertRowExecutor(
dialect,
@@ -240,6 +243,16 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
return this;
}
+ public JdbcDynamicOutputFormatBuilder setDirtyOptions(DirtyOptions
dirtyOptions) {
+ this.dirtyOptions = dirtyOptions;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setDirtySink(DirtySink<Object>
dirtySink) {
+ this.dirtySink = dirtySink;
+ return this;
+ }
+
public JdbcBatchingOutputFormat<RowData, ?, ?> build() {
checkNotNull(jdbcOptions, "jdbc options can not be null");
checkNotNull(dmlOptions, "jdbc dml options can not be null");
@@ -258,7 +271,9 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
dmlOptions, ctx, rowDataTypeInformation,
logicalTypes),
JdbcBatchingOutputFormat.RecordExtractor.identity(),
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtyOptions,
+ dirtySink);
} else {
// append only query
final String sql =
@@ -278,7 +293,9 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
rowDataTypeInformation),
JdbcBatchingOutputFormat.RecordExtractor.identity(),
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtyOptions,
+ dirtySink);
}
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 6756a1d3f..533224f00 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -41,9 +41,14 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import org.apache.inlong.sort.base.util.JdbcUrlUtils;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -183,7 +188,7 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
- helper.validate();
+ helper.validateExcept(DIRTY_PREFIX);
validateConfigOptions(config);
JdbcOptions jdbcOptions = getJdbcOptions(config);
TableSchema physicalSchema =
@@ -191,6 +196,9 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
boolean appendMode = config.get(SINK_APPEND_MODE);
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts =
config.getOptional(INLONG_AUDIT).orElse(null);
+ // Build the dirty data side-output
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(helper.getOptions());
+ final DirtySink<Object> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
return new JdbcDynamicTableSink(
jdbcOptions,
getJdbcExecutionOptions(config),
@@ -198,7 +206,9 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
physicalSchema,
appendMode,
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtyOptions,
+ dirtySink);
}
@Override
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
index 975bbd69c..e214e61d1 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -28,8 +28,11 @@ import
org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.jdbc.internal.GenericJdbcSinkFunction;
+import javax.annotation.Nullable;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkState;
@@ -53,6 +56,9 @@ public class JdbcDynamicTableSink implements DynamicTableSink
{
private final String auditHostAndPorts;
private final boolean appendMode;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
+
public JdbcDynamicTableSink(
JdbcOptions jdbcOptions,
JdbcExecutionOptions executionOptions,
@@ -60,7 +66,9 @@ public class JdbcDynamicTableSink implements DynamicTableSink
{
TableSchema tableSchema,
boolean appendMode,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
this.jdbcOptions = jdbcOptions;
this.executionOptions = executionOptions;
this.dmlOptions = dmlOptions;
@@ -69,6 +77,8 @@ public class JdbcDynamicTableSink implements DynamicTableSink
{
this.appendMode = appendMode;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
@Override
@@ -102,6 +112,8 @@ public class JdbcDynamicTableSink implements
DynamicTableSink {
builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
builder.setInLongMetric(inlongMetric);
builder.setAuditHostAndPorts(auditHostAndPorts);
+ builder.setDirtyOptions(dirtyOptions);
+ builder.setDirtySink(dirtySink);
return SinkFunctionProvider.of(
new GenericJdbcSinkFunction<>(builder.build()),
jdbcOptions.getParallelism());
}
@@ -109,7 +121,7 @@ public class JdbcDynamicTableSink implements
DynamicTableSink {
@Override
public DynamicTableSink copy() {
return new JdbcDynamicTableSink(jdbcOptions, executionOptions,
dmlOptions,
- tableSchema, appendMode, inlongMetric, auditHostAndPorts);
+ tableSchema, appendMode, inlongMetric, auditHostAndPorts,
dirtyOptions, dirtySink);
}
@Override
@@ -132,7 +144,9 @@ public class JdbcDynamicTableSink implements
DynamicTableSink {
&& Objects.equals(tableSchema, that.tableSchema)
&& Objects.equals(dialectName, that.dialectName)
&& Objects.equals(inlongMetric, that.inlongMetric)
- && Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
+ && Objects.equals(auditHostAndPorts, that.auditHostAndPorts)
+ && Objects.equals(dirtyOptions, that.dirtyOptions)
+ && Objects.equals(dirtySink, that.dirtySink);
}
@Override
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
index 6ff6d5a25..e556562b0 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractNodeToMySqlLoadNodeTest.java
@@ -43,7 +43,9 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -89,9 +91,15 @@ public class DorisExtractNodeToMySqlLoadNodeTest extends
AbstractTestBase {
new FieldInfo("sale", new
DoubleFormatInfo())));
List<FilterFunction> filters = new ArrayList<>();
-
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_sqlserver");
return new MySqlLoadNode("2", "mysql_output", fields, fieldRelations,
filters,
- null, null, null, "jdbc:mysql://localhost:3306/inlong",
+ null, null, properties, "jdbc:mysql://localhost:3306/inlong",
"inlong", "inlong", "table_output", null);
}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
index 2d623a9d3..31d7d4aa6 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
@@ -40,6 +40,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -72,8 +73,15 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase {
new FieldInfo("name", new StringFormatInfo())),
new FieldRelation(new FieldInfo("age", new
IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())));
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_sqlserver");
return new MySqlLoadNode("2", "mysql_output", fields, relations, null,
- null, null, null, "jdbc:mysql://localhost:3306/inlong",
+ null, null, properties, "jdbc:mysql://localhost:3306/inlong",
"inlong", "inlong", "table_output", "id");
}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
index 9fcbbb174..728d71fac 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
@@ -40,6 +40,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -72,8 +73,15 @@ public class OracleLoadSqlParseTest extends AbstractTestBase
{
new FieldInfo("NAME", new StringFormatInfo())),
new FieldRelation(new FieldInfo("age", new
IntFormatInfo()),
new FieldInfo("AGE", new IntFormatInfo())));
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=test&table=test2");
return new OracleLoadNode("2", "oracle_output", fields, relations,
null,
- null, null, null, "jdbc:oracle:thin:@localhost:1521:xe",
+ null, null, properties, "jdbc:oracle:thin:@localhost:1521:xe",
"flinkuser", "flinkpw", "student", "ID");
}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
index f5bf995d9..ccc43ea7d 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
@@ -39,6 +39,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -70,13 +71,20 @@ public class PostgresLoadNodeFlinkSqlParseTest extends
AbstractTestBase {
* @return postgres load node
*/
private PostgresLoadNode buildPostgresLoadNode() {
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=test&table=test2");
return new PostgresLoadNode("2", "postgres_output", Arrays.asList(new
FieldInfo("name",
new StringFormatInfo()), new FieldInfo("age", new
IntFormatInfo())),
Arrays.asList(new FieldRelation(new FieldInfo("name", new
StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
new FieldRelation(new FieldInfo("age", new
IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo()))),
- null, null, 1, null,
+ null, null, 1, properties,
"jdbc:postgresql://localhost:5432/postgres",
"postgres",
"inlong",
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
index 1ae060cdf..61c82f23e 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -130,8 +131,15 @@ public class SqlServerNodeSqlParseTest extends
AbstractTestBase {
new FieldInfo("id", new LongFormatInfo())),
new FieldRelation(new FieldInfo("name", new
StringFormatInfo()),
new FieldInfo("name", new
StringFormatInfo())));
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_sqlserver");
return new SqlServerLoadNode(id, "sqlserver_out", fields, relations,
null, null, 1,
- null,
"jdbc:sqlserver://localhost:1433;databaseName=column_type_test", "SA",
+ properties,
"jdbc:sqlserver://localhost:1433;databaseName=column_type_test", "SA",
"INLONG*123", "dbo", "work1", "id");
}