This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3291aeb26 [INLONG-6819][Sort] Add multi table sink for PostgresSQL
(#6820)
3291aeb26 is described below
commit 3291aeb260cdbea5bdba86f9a044dc08fd157d2a
Author: kuansix <[email protected]>
AuthorDate: Fri Dec 16 19:22:18 2022 +0800
[INLONG-6819][Sort] Add multi table sink for PostgresSQL (#6820)
---
.../JdbcMultiBatchingComm.java} | 139 +----
.../internal/JdbcMultiBatchingOutputFormat.java | 639 +++++++++++++++++++++
.../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 52 ++
.../sort/jdbc/table/JdbcDynamicTableFactory.java | 61 +-
.../sort/jdbc/table/JdbcDynamicTableSink.java | 47 +-
5 files changed, 797 insertions(+), 141 deletions(-)
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingComm.java
similarity index 60%
copy from
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
copy to
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingComm.java
index 4a0bfa228..b22ac81b4 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingComm.java
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.jdbc.table;
+package org.apache.inlong.sort.jdbc.internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
@@ -30,51 +28,24 @@ import
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementE
import
org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor;
import
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
-import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.inlong.sort.base.dirty.DirtyOptions;
-import org.apache.inlong.sort.base.dirty.sink.DirtySink;
-import org.apache.inlong.sort.jdbc.internal.JdbcBatchingOutputFormat;
-import java.io.Serializable;
import java.util.Arrays;
import java.util.function.Function;
import static org.apache.flink.table.data.RowData.createFieldGetter;
import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
- *
- * Builder for {@link JdbcBatchingOutputFormat} for Table/SQL.
- * Add an option `sink.ignore.changelog` to support insert-only mode without
primaryKey.
+ * Comm function for A JDBC multi-table outputFormat to get or create JDBC
Executor
*/
-public class JdbcDynamicOutputFormatBuilder implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private JdbcOptions jdbcOptions;
- private JdbcExecutionOptions executionOptions;
- private JdbcDmlOptions dmlOptions;
- private boolean appendMode;
- private TypeInformation<RowData> rowDataTypeInformation;
- private DataType[] fieldDataTypes;
- private String inlongMetric;
- private String auditHostAndPorts;
- private DirtyOptions dirtyOptions;
- private DirtySink<Object> dirtySink;
-
- public JdbcDynamicOutputFormatBuilder() {
+public class JdbcMultiBatchingComm {
- }
-
- private static JdbcBatchStatementExecutor<RowData>
createBufferReduceExecutor(
+ public static JdbcBatchStatementExecutor<RowData>
createBufferReduceExecutor(
JdbcDmlOptions opt,
RuntimeContext ctx,
TypeInformation<RowData> rowDataTypeInfo,
@@ -95,6 +66,7 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
ctx.getExecutionConfig().isObjectReuseEnabled()
? typeSerializer::copy
: Function.identity();
+
return new TableBufferReducedStatementExecutor(
createUpsertRowExecutor(
dialect,
@@ -109,7 +81,7 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
valueTransform);
}
- private static JdbcBatchStatementExecutor<RowData>
createSimpleBufferedExecutor(
+ public static JdbcBatchStatementExecutor<RowData>
createSimpleBufferedExecutor(
RuntimeContext ctx,
JdbcDialect dialect,
String[] fieldNames,
@@ -201,101 +173,4 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
return pkRow;
}
- public JdbcDynamicOutputFormatBuilder setAppendMode(boolean appendMode) {
- this.appendMode = appendMode;
- return this;
- }
-
- public JdbcDynamicOutputFormatBuilder setJdbcOptions(JdbcOptions
jdbcOptions) {
- this.jdbcOptions = jdbcOptions;
- return this;
- }
-
- public JdbcDynamicOutputFormatBuilder setJdbcExecutionOptions(
- JdbcExecutionOptions executionOptions) {
- this.executionOptions = executionOptions;
- return this;
- }
-
- public JdbcDynamicOutputFormatBuilder setJdbcDmlOptions(JdbcDmlOptions
dmlOptions) {
- this.dmlOptions = dmlOptions;
- return this;
- }
-
- public JdbcDynamicOutputFormatBuilder setRowDataTypeInfo(
- TypeInformation<RowData> rowDataTypeInfo) {
- this.rowDataTypeInformation = rowDataTypeInfo;
- return this;
- }
-
- public JdbcDynamicOutputFormatBuilder setFieldDataTypes(DataType[]
fieldDataTypes) {
- this.fieldDataTypes = fieldDataTypes;
- return this;
- }
-
- public JdbcDynamicOutputFormatBuilder setInLongMetric(String inlongMetric)
{
- this.inlongMetric = inlongMetric;
- return this;
- }
-
- public JdbcDynamicOutputFormatBuilder setAuditHostAndPorts(String
auditHostAndPorts) {
- this.auditHostAndPorts = auditHostAndPorts;
- return this;
- }
-
- public JdbcDynamicOutputFormatBuilder setDirtyOptions(DirtyOptions
dirtyOptions) {
- this.dirtyOptions = dirtyOptions;
- return this;
- }
-
- public JdbcDynamicOutputFormatBuilder setDirtySink(DirtySink<Object>
dirtySink) {
- this.dirtySink = dirtySink;
- return this;
- }
-
- public JdbcBatchingOutputFormat<RowData, ?, ?> build() {
- checkNotNull(jdbcOptions, "jdbc options can not be null");
- checkNotNull(dmlOptions, "jdbc dml options can not be null");
- checkNotNull(executionOptions, "jdbc execution options can not be
null");
-
- final LogicalType[] logicalTypes =
- Arrays.stream(fieldDataTypes)
- .map(DataType::getLogicalType)
- .toArray(LogicalType[]::new);
- if (dmlOptions.getKeyFields().isPresent() &&
dmlOptions.getKeyFields().get().length > 0 && !appendMode) {
- // upsert query
- return new JdbcBatchingOutputFormat<>(
- new SimpleJdbcConnectionProvider(jdbcOptions),
- executionOptions,
- ctx -> createBufferReduceExecutor(
- dmlOptions, ctx, rowDataTypeInformation,
logicalTypes),
- JdbcBatchingOutputFormat.RecordExtractor.identity(),
- inlongMetric,
- auditHostAndPorts,
- dirtyOptions,
- dirtySink);
- } else {
- // append only query
- final String sql =
- dmlOptions
- .getDialect()
- .getInsertIntoStatement(
- dmlOptions.getTableName(),
dmlOptions.getFieldNames());
- return new JdbcBatchingOutputFormat<>(
- new SimpleJdbcConnectionProvider(jdbcOptions),
- executionOptions,
- ctx -> createSimpleBufferedExecutor(
- ctx,
- dmlOptions.getDialect(),
- dmlOptions.getFieldNames(),
- logicalTypes,
- sql,
- rowDataTypeInformation),
- JdbcBatchingOutputFormat.RecordExtractor.identity(),
- inlongMetric,
- auditHostAndPorts,
- dirtyOptions,
- dirtySink);
- }
- }
-}
\ No newline at end of file
+}
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
new file mode 100644
index 000000000..069d41f31
--- /dev/null
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+
+/**
+ * A JDBC multi-table outputFormat that supports batching records before
writing records to databases.
+ * Add an option `inlong.metric` to support metrics.
+ */
+public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends
JdbcBatchStatementExecutor<JdbcIn>>
+ extends
+ AbstractJdbcOutputFormat<In> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcMultiBatchingOutputFormat.class);
+ private final JdbcExecutionOptions executionOptions;
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+ private transient int batchCount = 0;
+ private transient volatile boolean closed = false;
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture<?> scheduledFuture;
+ private transient RuntimeContext runtimeContext;
+ private JdbcDmlOptions dmlOptions;
+ private JdbcOptions jdbcOptions;
+ private boolean appendMode;
+ private transient Map<String, JdbcExec> jdbcExecMap = new HashMap<>();
+ private transient Map<String, SimpleJdbcConnectionProvider>
connectionExecProviderMap = new HashMap<>();
+ private transient Map<String, RowType> rowTypeMap = new HashMap<>();
+ private transient Map<String, List<String>> pkNameMap = new HashMap<>();
+ private transient Map<String, List<GenericRowData>> recordsMap = new
HashMap<>();
+ private transient Map<String, Exception> tableExceptionMap = new
HashMap<>();
+ private transient Boolean isIgnoreTableException;
+
+ private transient ListState<MetricState> metricStateListState;
+ private final String sinkMultipleFormat;
+ private final String databasePattern;
+ private final String tablePattern;
+ private final String schemaPattern;
+ private transient MetricState metricState;
+ private SinkMetricData sinkMetricData;
+ private Long dataSize = 0L;
+ private Long rowSize = 0L;
+ private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
+
+ public JdbcMultiBatchingOutputFormat(
+ @Nonnull JdbcConnectionProvider connectionProvider,
+ @Nonnull JdbcExecutionOptions executionOptions,
+ @Nonnull JdbcDmlOptions dmlOptions,
+ @Nonnull boolean appendMode,
+ @Nonnull JdbcOptions jdbcOptions,
+ String sinkMultipleFormat,
+ String databasePattern,
+ String tablePattern,
+ String schemaPattern,
+ String inlongMetric,
+ String auditHostAndPorts,
+ SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) {
+ super(connectionProvider);
+ this.executionOptions = checkNotNull(executionOptions);
+ this.dmlOptions = dmlOptions;
+ this.appendMode = appendMode;
+ this.jdbcOptions = jdbcOptions;
+ this.sinkMultipleFormat = sinkMultipleFormat;
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ this.schemaPattern = schemaPattern;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy;
+ }
+
+ /**
+ * Connects to the target database and initializes the prepared statement.
+ *
+ * @param taskNumber The number of the parallel instance.
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ this.runtimeContext = getRuntimeContext();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption,
runtimeContext.getMetricGroup());
+ }
+ jdbcExecMap = new HashMap<>();
+ connectionExecProviderMap = new HashMap<>();
+ pkNameMap = new HashMap<>();
+ rowTypeMap = new HashMap<>();
+ recordsMap = new HashMap<>();
+ tableExceptionMap = new HashMap<>();
+ isIgnoreTableException =
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE)
+ ||
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.STOP_PARTIAL);
+ if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
+ this.scheduler =
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("jdbc-upsert-output-format"));
+ this.scheduledFuture =
+ this.scheduler.scheduleWithFixedDelay(
+ () -> {
+ synchronized
(JdbcMultiBatchingOutputFormat.this) {
+ if (!closed) {
+ try {
+ flush();
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowSize,
dataSize);
+ }
+ resetStateAfterFlush();
+ } catch (Exception e) {
+ if (sinkMetricData != null) {
+
sinkMetricData.invokeDirty(rowSize, dataSize);
+ }
+ resetStateAfterFlush();
+ }
+ }
+ }
+ },
+ executionOptions.getBatchIntervalMs(),
+ executionOptions.getBatchIntervalMs(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * get or create StatementExecutor for one table.
+ *
+ * @param tableIdentifier The table identifier for which to get
statementExecutor.
+ */
+ private JdbcExec getOrCreateStatementExecutor(
+ String tableIdentifier) throws IOException {
+ if (StringUtils.isBlank(tableIdentifier)) {
+ return null;
+ }
+ JdbcExec jdbcExec = jdbcExecMap.get(tableIdentifier);
+ if (null != jdbcExec) {
+ return jdbcExec;
+ }
+ RowType rowType = rowTypeMap.get(tableIdentifier);
+ LogicalType[] logicalTypes = rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ String[] filedNames = rowType.getFields().stream()
+ .map(RowType.RowField::getName)
+ .toArray(String[]::new);
+ TypeInformation<RowData> rowDataTypeInfo =
InternalTypeInfo.of(rowType);
+ List<String> pkNameList = null;
+ if (null != pkNameMap.get(tableIdentifier)) {
+ pkNameList = pkNameMap.get(tableIdentifier);
+ }
+ StatementExecutorFactory<JdbcExec> statementExecutorFactory = null;
+ if (CollectionUtils.isNotEmpty(pkNameList) && !appendMode) {
+ // upsert query
+ JdbcDmlOptions createDmlOptions = JdbcDmlOptions.builder()
+ .withTableName(getTbNameFromIdentifier(tableIdentifier))
+ .withDialect(jdbcOptions.getDialect())
+ .withFieldNames(filedNames)
+ .withKeyFields(pkNameList.toArray(new
String[pkNameList.size()]))
+ .build();
+ statementExecutorFactory = ctx -> (JdbcExec)
JdbcMultiBatchingComm.createBufferReduceExecutor(
+ createDmlOptions, ctx, rowDataTypeInfo, logicalTypes);
+
+ } else {
+ // append only query
+ final String sql = dmlOptions
+ .getDialect()
+
.getInsertIntoStatement(getTbNameFromIdentifier(tableIdentifier), filedNames);
+ statementExecutorFactory = ctx -> (JdbcExec)
JdbcMultiBatchingComm.createSimpleBufferedExecutor(
+ ctx,
+ dmlOptions.getDialect(),
+ filedNames,
+ logicalTypes,
+ sql,
+ rowDataTypeInfo);
+ }
+
+ jdbcExec = statementExecutorFactory.apply(getRuntimeContext());
+ try {
+ JdbcOptions jdbcExecOptions =
+ JdbcOptions.builder()
+ .setDBUrl(jdbcOptions.getDbURL() + "/" +
getTDbNameFromIdentifier(tableIdentifier))
+
.setTableName(getTbNameFromIdentifier(tableIdentifier))
+ .setDialect(jdbcOptions.getDialect())
+ .setParallelism(jdbcOptions.getParallelism())
+
.setConnectionCheckTimeoutSeconds(jdbcOptions.getConnectionCheckTimeoutSeconds())
+ .setDriverName(jdbcOptions.getDriverName())
+ .setUsername(jdbcOptions.getUsername().orElse(""))
+ .setPassword(jdbcOptions.getPassword().orElse(""))
+ .build();
+ SimpleJdbcConnectionProvider tableConnectionProvider = new
SimpleJdbcConnectionProvider(jdbcExecOptions);
+ try {
+ tableConnectionProvider.getOrEstablishConnection();
+ } catch (Exception e) {
+ LOG.error("unable to open JDBC writer, tableIdentifier:{}
err:", tableIdentifier, e);
+ return null;
+ }
+ connectionExecProviderMap.put(tableIdentifier,
tableConnectionProvider);
+
jdbcExec.prepareStatements(tableConnectionProvider.getConnection());
+ } catch (Exception e) {
+ return null;
+ }
+ jdbcExecMap.put(tableIdentifier, jdbcExec);
+ return jdbcExec;
+ }
+
+ /**
+ * Get table name From tableIdentifier
+ * tableIdentifier maybe: ${dbName}.${tbName} or
${dbName}.${schemaName}.${tbName}
+ *
+ * @param tableIdentifier The table identifier for which to get table name.
+ */
+ private String getTbNameFromIdentifier(String tableIdentifier) {
+ String[] fileArray = tableIdentifier.split("\\.");
+ if (2 == fileArray.length) {
+ return fileArray[1];
+ }
+ if (3 == fileArray.length) {
+ return fileArray[1] + "." + fileArray[2];
+ }
+ return null;
+ }
+
+ private String getTDbNameFromIdentifier(String tableIdentifier) {
+ String[] fileArray = tableIdentifier.split("\\.");
+ return fileArray[0];
+ }
+
+ private void checkFlushException() {
+ if
(schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)
+ && !tableExceptionMap.isEmpty()) {
+ String tableErr = "Writing table get failed, tables are:";
+ for (Map.Entry<String, Exception> entry :
tableExceptionMap.entrySet()) {
+ LOG.error("Writing table:{} get err:{}", entry.getKey(),
entry.getValue().getMessage());
+ tableErr = tableErr + entry.getKey() + ",";
+ }
+ throw new RuntimeException(tableErr.substring(0, tableErr.length()
- 1));
+ }
+ }
+
+ /**
+ * Extract and write record to recordsMap(buffer)
+ *
+ * @param row The record to write.
+ */
+ @Override
+ public final synchronized void writeRecord(In row) throws IOException {
+ checkFlushException();
+ JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+ (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ if (row instanceof RowData) {
+ RowData rowData = (RowData) row;
+ JsonNode rootNode =
jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
+ String tableIdentifier = null;
+ try {
+ if (StringUtils.isBlank(schemaPattern)) {
+ tableIdentifier = StringUtils.join(
+ jsonDynamicSchemaFormat.parse(rootNode,
databasePattern), ".",
+ jsonDynamicSchemaFormat.parse(rootNode,
tablePattern));
+ } else {
+ tableIdentifier = StringUtils.join(
+ jsonDynamicSchemaFormat.parse(rootNode,
databasePattern), ".",
+ jsonDynamicSchemaFormat.parse(rootNode,
schemaPattern), ".",
+ jsonDynamicSchemaFormat.parse(rootNode,
tablePattern));
+ }
+ } catch (Exception e) {
+ LOG.info("Cal tableIdentifier get Exception:", e);
+ return;
+ }
+ rowSize++;
+ dataSize = dataSize +
rootNode.toString().getBytes(StandardCharsets.UTF_8).length;
+
+ GenericRowData record = null;
+ try {
+ RowType rowType =
jsonDynamicSchemaFormat.extractSchema(rootNode);
+ if (rowType != null) {
+ if (null != rowTypeMap.get(tableIdentifier)) {
+ if (!rowType.equals(rowTypeMap.get(tableIdentifier))) {
+ attemptFlush();
+ rowTypeMap.put(tableIdentifier, rowType);
+ updateOneExecutor(true, tableIdentifier);
+ }
+ } else {
+ rowTypeMap.put(tableIdentifier, rowType);
+ }
+ }
+ List<String> pkNameList =
jsonDynamicSchemaFormat.extractPrimaryKeyNames(rootNode);
+ pkNameMap.put(tableIdentifier, pkNameList);
+ JsonNode physicalData =
jsonDynamicSchemaFormat.getPhysicalData(rootNode);
+ List<Map<String, String>> physicalDataList =
jsonDynamicSchemaFormat.jsonNode2Map(physicalData);
+ record = generateRecord(rowType, physicalDataList.get(0));
+ List<RowKind> rowKinds = jsonDynamicSchemaFormat
+
.opType2RowKind(jsonDynamicSchemaFormat.getOpType(rootNode));
+ record.setRowKind(rowKinds.get(rowKinds.size() - 1));
+ } catch (Exception e) {
+ LOG.warn("Extract schema failed", e);
+ return;
+ }
+ try {
+ recordsMap.computeIfAbsent(tableIdentifier, k -> new
ArrayList<>())
+ .add(record);
+ batchCount++;
+ if (executionOptions.getBatchSize() > 0
+ && batchCount >= executionOptions.getBatchSize()) {
+ flush();
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowSize, dataSize);
+ }
+ resetStateAfterFlush();
+ }
+ } catch (Exception e) {
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(rowSize, dataSize);
+ }
+ resetStateAfterFlush();
+ throw new IOException("Writing records to JDBC failed.", e);
+ }
+ }
+ }
+
+ /**
+ * Convert fieldMap(data) to GenericRowData with rowType(schema)
+ */
+ protected GenericRowData generateRecord(RowType rowType, Map<String,
String> fieldMap) {
+ String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+ int arity = fieldNames.length;
+ GenericRowData record = new GenericRowData(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ String fieldValue = fieldMap.get(fieldName);
+ if (StringUtils.isBlank(fieldValue)) {
+ record.setField(i, null);
+ } else {
+ switch (rowType.getFields().get(i).getType().getTypeRoot()) {
+ case BIGINT:
+ record.setField(i, Long.valueOf(fieldValue));
+ break;
+ case BOOLEAN:
+ record.setField(i, Boolean.valueOf(fieldValue));
+ break;
+ case DOUBLE:
+ case DECIMAL:
+ record.setField(i, Double.valueOf(fieldValue));
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case INTERVAL_DAY_TIME:
+ TimestampData timestampData =
TimestampData.fromEpochMillis(Long.valueOf(fieldValue));
+ record.setField(i, timestampData);
+ break;
+ case BINARY:
+ record.setField(i,
Arrays.toString(fieldValue.getBytes(StandardCharsets.UTF_8)));
+ break;
+ default:
+ record.setField(i, StringData.fromString(fieldValue));
+ }
+ }
+ }
+ return record;
+ }
+
+ private void resetStateAfterFlush() {
+ dataSize = 0L;
+ rowSize = 0L;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ if (sinkMetricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState,
sinkMetricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ if (this.inlongMetric != null) {
+ this.metricStateListState =
context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new
TypeHint<MetricState>() {
+ })));
+
+ }
+ if (context.isRestored()) {
+ metricState =
MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ checkFlushException();
+ attemptFlush();
+ batchCount = 0;
+ }
+
+ /**
+ * Write all recorde from recordsMap to db
+ *
+ * First batch writing.
+ * If batch-writing occur exception, then rewrite one-by-one retry-times
set by user.
+ */
+ protected void attemptFlush() throws IOException {
+ for (Map.Entry<String, List<GenericRowData>> entry :
recordsMap.entrySet()) {
+ String tableIdentifier = entry.getKey();
+ boolean isIgnoreTableIdentifierException = isIgnoreTableException
+ && (null != tableExceptionMap.get(tableIdentifier));
+ if (isIgnoreTableIdentifierException) {
+ continue;
+ }
+ List<GenericRowData> tableIdRecordList = entry.getValue();
+ if (CollectionUtils.isEmpty(tableIdRecordList)) {
+ continue;
+ }
+ JdbcExec jdbcStatementExecutor = null;
+ Boolean flushFlag = false;
+ Exception tableException = null;
+ try {
+ jdbcStatementExecutor =
getOrCreateStatementExecutor(tableIdentifier);
+ for (GenericRowData record : tableIdRecordList) {
+ jdbcStatementExecutor.addToBatch((JdbcIn) record);
+ }
+ jdbcStatementExecutor.executeBatch();
+ flushFlag = true;
+ } catch (Exception e) {
+ tableException = e;
+ LOG.warn("Flush all data for tableIdentifier:{} get err:",
tableIdentifier, e);
+ getAndSetPkFromErrMsg(tableIdentifier, e.getMessage());
+ updateOneExecutor(true, tableIdentifier);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "unable to flush; interrupted while doing another
attempt", e);
+ }
+ }
+
+ if (!flushFlag) {
+ for (GenericRowData record : tableIdRecordList) {
+ for (int retryTimes = 1; retryTimes <=
executionOptions.getMaxRetries(); retryTimes++) {
+ try {
+ jdbcStatementExecutor =
getOrCreateStatementExecutor(tableIdentifier);
+ jdbcStatementExecutor.addToBatch((JdbcIn) record);
+ jdbcStatementExecutor.executeBatch();
+ flushFlag = true;
+ break;
+ } catch (Exception e) {
+ LOG.error("Flush one record tableIdentifier:{}
,retryTimes:{} get err:",
+ tableIdentifier, retryTimes, e);
+ getAndSetPkFromErrMsg(e.getMessage(),
tableIdentifier);
+ tableException = e;
+ updateOneExecutor(true, tableIdentifier);
+ try {
+ Thread.sleep(1000 * retryTimes);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "unable to flush; interrupted while
doing another attempt", e);
+ }
+ }
+ }
+ if (!flushFlag && null != tableException) {
+ LOG.info("Put tableIdentifier:{} exception:{}",
+ tableIdentifier, tableException.getMessage());
+ tableExceptionMap.put(tableIdentifier, tableException);
+ if (isIgnoreTableException) {
+ LOG.info("Stop write table:{} because occur
exception",
+ tableIdentifier);
+ continue;
+ }
+ }
+ }
+ }
+ tableIdRecordList.clear();
+ }
+ }
+
+ /**
+ * Executes prepared statement and closes all resources of this instance.
+ */
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+
+ if (batchCount > 0) {
+ try {
+ flush();
+ } catch (Exception e) {
+ LOG.warn("Writing records to JDBC failed.", e);
+ throw new RuntimeException("Writing records to JDBC
failed.", e);
+ }
+ }
+
+ try {
+ if (null != jdbcExecMap) {
+ jdbcExecMap.forEach((tableIdentifier, jdbcExec) -> {
+ try {
+ jdbcExec.closeStatements();
+ } catch (SQLException e) {
+ LOG.error("jdbcExec executeBatch get err", e);
+ }
+ });
+ }
+ } catch (Exception e) {
+ LOG.warn("Close JDBC writer failed.", e);
+ }
+ }
+ super.close();
+ checkFlushException();
+ }
+
+ public boolean getAndSetPkFromErrMsg(String errMsg, String
tableIdentifier) {
+ String rgex = "Detail: Key \\((.*?)\\)=\\(";
+ Pattern pattern = Pattern.compile(rgex);
+ Matcher m = pattern.matcher(errMsg);
+ List<String> pkNameList = new ArrayList<>();
+ if (m.find()) {
+ String[] pkNameArray = m.group(1).split(",");
+ for (String pkName : pkNameArray) {
+ pkNameList.add(pkName.trim());
+ }
+ pkNameMap.put(tableIdentifier, pkNameList);
+ return true;
+ }
+ return false;
+ }
+
+ public void updateOneExecutor(boolean reconnect, String tableIdentifier) {
+ try {
+ SimpleJdbcConnectionProvider tableConnectionProvider =
connectionExecProviderMap.get(tableIdentifier);
+ if (reconnect || null == tableConnectionProvider
+ || !tableConnectionProvider.isConnectionValid()) {
+ JdbcExec tableJdbcExec = jdbcExecMap.get(tableIdentifier);
+ if (null != tableJdbcExec) {
+ tableJdbcExec.closeStatements();
+ jdbcExecMap.remove(tableIdentifier);
+ getOrCreateStatementExecutor(tableIdentifier);
+ }
+ }
+ } catch (SQLException | IOException e) {
+ LOG.error("jdbcExec updateOneExecutor get err", e);
+ }
+ }
+
+ /**
+ * A factory for creating {@link JdbcBatchStatementExecutor} instance.
+ *
+ * @param <T> The type of instance.
+ */
+ public interface StatementExecutorFactory<T extends
JdbcBatchStatementExecutor<?>>
+ extends
+ Function<RuntimeContext, T>,
+ Serializable {
+
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index 4a0bfa228..531864cd4 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -37,9 +37,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.jdbc.internal.JdbcBatchingOutputFormat;
+import org.apache.inlong.sort.jdbc.internal.JdbcMultiBatchingOutputFormat;
import java.io.Serializable;
import java.util.Arrays;
@@ -67,6 +69,11 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
private DataType[] fieldDataTypes;
private String inlongMetric;
private String auditHostAndPorts;
+ private String sinkMultipleFormat;
+ private String databasePattern;
+ private String tablePattern;
+ private String schemaPattern;
+ private SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
private DirtyOptions dirtyOptions;
private DirtySink<Object> dirtySink;
@@ -243,6 +250,32 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
return this;
}
+ public JdbcDynamicOutputFormatBuilder setSinkMultipleFormat(String
sinkMultipleFormat) {
+ this.sinkMultipleFormat = sinkMultipleFormat;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setDatabasePattern(String
databasePattern) {
+ this.databasePattern = databasePattern;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setTablePattern(String tablePattern)
{
+ this.tablePattern = tablePattern;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setSchemaPattern(String
schemaPattern) {
+ this.schemaPattern = schemaPattern;
+ return this;
+ }
+
+ public JdbcDynamicOutputFormatBuilder setSchemaUpdatePolicy(
+ SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) {
+ this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy;
+ return this;
+ }
+
public JdbcDynamicOutputFormatBuilder setDirtyOptions(DirtyOptions
dirtyOptions) {
this.dirtyOptions = dirtyOptions;
return this;
@@ -298,4 +331,23 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
dirtySink);
}
}
+
+ public JdbcMultiBatchingOutputFormat<RowData, ?, ?> buildMulti() {
+ checkNotNull(jdbcOptions, "jdbc options can not be null");
+ checkNotNull(dmlOptions, "jdbc dml options can not be null");
+ checkNotNull(executionOptions, "jdbc execution options can not be
null");
+ return new JdbcMultiBatchingOutputFormat<>(
+ new SimpleJdbcConnectionProvider(jdbcOptions),
+ executionOptions,
+ dmlOptions,
+ appendMode,
+ jdbcOptions,
+ sinkMultipleFormat,
+ databasePattern,
+ tablePattern,
+ schemaPattern,
+ inlongMetric,
+ auditHostAndPorts,
+ schemaUpdateExceptionPolicy);
+ }
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 533224f00..e12128ec8 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -41,16 +41,25 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.base.util.JdbcUrlUtils;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
-import org.apache.inlong.sort.base.util.JdbcUrlUtils;
-import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
/**
* Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
@@ -182,6 +191,14 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
.defaultValue(false)
.withDescription("Whether to support sink update/delete
data without primaryKey.");
+ public static final ConfigOption<String> SINK_MULTIPLE_SCHEMA_PATTERN =
+ ConfigOptions.key("sink.multiple.schema-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The option
'sink.multiple.schema-pattern' "
+ + "is used extract table name from the raw binary
data, "
+ + "this is only used in the multiple sink writing
scenario.");
+
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper =
@@ -190,12 +207,20 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
helper.validateExcept(DIRTY_PREFIX);
validateConfigOptions(config);
+ boolean multipleSink =
config.getOptional(SINK_MULTIPLE_ENABLE).orElse(false);
+ String sinkMultipleFormat =
helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
+ String databasePattern =
helper.getOptions().getOptional(SINK_MULTIPLE_DATABASE_PATTERN).orElse(null);
+ String tablePattern =
helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null);
+ String schemaPattern =
helper.getOptions().getOptional(SINK_MULTIPLE_SCHEMA_PATTERN).orElse(databasePattern);
+ validateSinkMultiple(multipleSink, sinkMultipleFormat,
databasePattern, schemaPattern, tablePattern);
JdbcOptions jdbcOptions = getJdbcOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
boolean appendMode = config.get(SINK_APPEND_MODE);
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts =
config.getOptional(INLONG_AUDIT).orElse(null);
+ SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy =
+
helper.getOptions().getOptional(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY).orElse(null);
// Build the dirty data side-output
final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(helper.getOptions());
final DirtySink<Object> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
@@ -205,8 +230,14 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
getJdbcDmlOptions(jdbcOptions, physicalSchema),
physicalSchema,
appendMode,
+ multipleSink,
+ sinkMultipleFormat,
+ databasePattern,
+ tablePattern,
+ schemaPattern,
inlongMetric,
auditHostAndPorts,
+ schemaUpdateExceptionPolicy,
dirtyOptions,
dirtySink);
}
@@ -228,6 +259,26 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
physicalSchema);
}
+ private void validateSinkMultiple(boolean multipleSink, String
sinkMultipleFormat,
+ String databasePattern, String schemaPattern, String tablePattern)
{
+ Preconditions.checkNotNull(multipleSink, "The option
'sink.multiple.enable' is not allowed null");
+ if (multipleSink) {
+ Preconditions.checkNotNull(databasePattern, "The option
'sink.multiple.database-pattern'"
+ + " is not allowed blank when the option
'sink.multiple.enable' is 'true'");
+ Preconditions.checkNotNull(schemaPattern, "The option
'sink.multiple.schema-pattern'"
+ + " is not allowed blank when the option
'sink.multiple.enable' is 'true'");
+ Preconditions.checkNotNull(tablePattern, "The option
'sink.multiple.table-pattern' "
+ + "is not allowed blank when the option
'sink.multiple.enable' is 'true'");
+ Preconditions.checkNotNull(sinkMultipleFormat, "The option
'sink.multiple.format' "
+ + "is not allowed blank when the option
'sink.multiple.enable' is 'true'");
+ DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ Set<String> supportFormats =
DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
+
Preconditions.checkArgument(supportFormats.contains(sinkMultipleFormat),
String.format(
+ "Unsupported value '%s' for '%s'. Supported values are
%s.",
+ sinkMultipleFormat, SINK_MULTIPLE_FORMAT.key(),
supportFormats));
+ }
+ }
+
private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
String url =
JdbcUrlUtils.replaceInvalidUrlProperty(readableConfig.get(URL));
Optional<String> dialectImplOptional =
readableConfig.getOptional(DIALECT_IMPL);
@@ -334,6 +385,12 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
optionalOptions.add(FactoryUtil.SINK_PARALLELISM);
optionalOptions.add(MAX_RETRY_TIMEOUT);
optionalOptions.add(DIALECT_IMPL);
+ optionalOptions.add(SINK_MULTIPLE_ENABLE);
+ optionalOptions.add(SINK_MULTIPLE_FORMAT);
+ optionalOptions.add(SINK_MULTIPLE_DATABASE_PATTERN);
+ optionalOptions.add(SINK_MULTIPLE_TABLE_PATTERN);
+ optionalOptions.add(SINK_MULTIPLE_SCHEMA_PATTERN);
+ optionalOptions.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
optionalOptions.add(INLONG_METRIC);
optionalOptions.add(INLONG_AUDIT);
return optionalOptions;
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
index e214e61d1..d3098ee11 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.jdbc.internal.GenericJdbcSinkFunction;
@@ -52,9 +53,15 @@ public class JdbcDynamicTableSink implements
DynamicTableSink {
private final TableSchema tableSchema;
private final String dialectName;
+ private final boolean multipleSink;
private final String inlongMetric;
private final String auditHostAndPorts;
private final boolean appendMode;
+ private final String sinkMultipleFormat;
+ private final String databasePattern;
+ private final String tablePattern;
+ private final String schemaPattern;
+ private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
@@ -65,8 +72,14 @@ public class JdbcDynamicTableSink implements
DynamicTableSink {
JdbcDmlOptions dmlOptions,
TableSchema tableSchema,
boolean appendMode,
+ boolean multipleSink,
+ String sinkMultipleFormat,
+ String databasePattern,
+ String tablePattern,
+ String schemaPattern,
String inlongMetric,
String auditHostAndPorts,
+ SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy,
DirtyOptions dirtyOptions,
@Nullable DirtySink<Object> dirtySink) {
this.jdbcOptions = jdbcOptions;
@@ -75,15 +88,23 @@ public class JdbcDynamicTableSink implements
DynamicTableSink {
this.tableSchema = tableSchema;
this.dialectName = dmlOptions.getDialect().dialectName();
this.appendMode = appendMode;
+ this.multipleSink = multipleSink;
+ this.sinkMultipleFormat = sinkMultipleFormat;
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ this.schemaPattern = schemaPattern;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy;
this.dirtyOptions = dirtyOptions;
this.dirtySink = dirtySink;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
- validatePrimaryKey(requestedMode);
+ if (!multipleSink) {
+ validatePrimaryKey(requestedMode);
+ }
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
@@ -103,25 +124,37 @@ public class JdbcDynamicTableSink implements
DynamicTableSink {
final TypeInformation<RowData> rowDataTypeInformation =
context.createTypeInformation(tableSchema.toRowDataType());
final JdbcDynamicOutputFormatBuilder builder = new
JdbcDynamicOutputFormatBuilder();
-
builder.setAppendMode(appendMode);
builder.setJdbcOptions(jdbcOptions);
builder.setJdbcDmlOptions(dmlOptions);
builder.setJdbcExecutionOptions(executionOptions);
- builder.setRowDataTypeInfo(rowDataTypeInformation);
- builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
builder.setInLongMetric(inlongMetric);
builder.setAuditHostAndPorts(auditHostAndPorts);
builder.setDirtyOptions(dirtyOptions);
builder.setDirtySink(dirtySink);
- return SinkFunctionProvider.of(
- new GenericJdbcSinkFunction<>(builder.build()),
jdbcOptions.getParallelism());
+ if (multipleSink) {
+ builder.setSinkMultipleFormat(sinkMultipleFormat);
+ builder.setDatabasePattern(databasePattern);
+ builder.setTablePattern(tablePattern);
+ builder.setSchemaPattern(schemaPattern);
+ builder.setSchemaUpdatePolicy(schemaUpdateExceptionPolicy);
+ return SinkFunctionProvider.of(
+ new GenericJdbcSinkFunction<>(builder.buildMulti()),
jdbcOptions.getParallelism());
+ } else {
+ builder.setRowDataTypeInfo(rowDataTypeInformation);
+ builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
+ return SinkFunctionProvider.of(
+ new GenericJdbcSinkFunction<>(builder.build()),
jdbcOptions.getParallelism());
+ }
}
@Override
public DynamicTableSink copy() {
return new JdbcDynamicTableSink(jdbcOptions, executionOptions,
dmlOptions,
- tableSchema, appendMode, inlongMetric, auditHostAndPorts,
dirtyOptions, dirtySink);
+ tableSchema, appendMode, multipleSink, sinkMultipleFormat,
+ databasePattern, tablePattern, schemaPattern,
+ inlongMetric, auditHostAndPorts,
+ schemaUpdateExceptionPolicy, dirtyOptions, dirtySink);
}
@Override