This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f2110a4316 [INLONG-10055][Sort] Support flink-connector-jdbc based on
flink 1.18 (#10073)
f2110a4316 is described below
commit f2110a43167cf12646956730a7997b8099b88d9d
Author: AloysZhang <[email protected]>
AuthorDate: Fri Apr 26 18:52:12 2024 +0800
[INLONG-10055][Sort] Support flink-connector-jdbc based on flink 1.18
(#10073)
---
.../src/main/assemblies/sort-connectors-v1.18.xml | 9 +
.../sort-flink-v1.18/sort-connectors/jdbc/pom.xml | 117 +++++
.../jdbc/internal/GenericJdbcSinkFunction.java | 82 ++++
.../sort/jdbc/internal/JdbcOutputFormat.java | 499 +++++++++++++++++++++
.../jdbc/internal/TableJdbcUpsertOutputFormat.java | 225 ++++++++++
.../sort/jdbc/table/JdbcDynamicTableFactory.java | 357 +++++++++++++++
.../sort/jdbc/table/JdbcDynamicTableSink.java | 148 ++++++
.../sort/jdbc/table/JdbcOutputFormatBuilder.java | 283 ++++++++++++
.../org.apache.flink.table.factories.Factory | 16 +
.../sort-flink-v1.18/sort-connectors/pom.xml | 1 +
licenses/inlong-sort-connectors/LICENSE | 9 +
11 files changed, 1746 insertions(+)
diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
index 61465915fa..04e1fce16b 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
@@ -36,5 +36,14 @@
<fileMode>0644</fileMode>
</fileSet>
+ <fileSet>
+
<directory>../inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/target</directory>
+ <outputDirectory>inlong-sort/connectors</outputDirectory>
+ <includes>
+
<include>sort-connector-jdbc-v1.18-${project.version}.jar</include>
+ </includes>
+ <fileMode>0644</fileMode>
+ </fileSet>
+
</fileSets>
</assembly>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/pom.xml
new file mode 100644
index 0000000000..dcd7e24afc
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connectors-v1.18</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-connector-jdbc-v1.18</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort-connector-jdbc</name>
+
+ <properties>
+ <flink.connector.jdbc.version>3.1.2-1.18</flink.connector.jdbc.version>
+
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-jdbc</artifactId>
+ <version>${flink.connector.jdbc.version}</version>
+ </dependency>
+
+ <!--for mysql-->
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Postgres -->
+
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
+
+ <!-- Oracle -->
+ <dependency>
+ <groupId>com.oracle.database.jdbc</groupId>
+ <artifactId>ojdbc8</artifactId>
+ </dependency>
+
+ <!-- SQL Server -->
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+ </includes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
new file mode 100644
index 0000000000..39c6e45ec0
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/** A generic SinkFunction for JDBC.
+ * Modify from {@link
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction}
+ * */
+@Internal
+public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
+ implements
+ CheckpointedFunction,
+ InputTypeConfigurable {
+
+ private final JdbcOutputFormat<T, ?, ?> outputFormat;
+
+ public GenericJdbcSinkFunction(@Nonnull JdbcOutputFormat<T, ?, ?>
outputFormat) {
+ this.outputFormat = Preconditions.checkNotNull(outputFormat);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ RuntimeContext ctx = getRuntimeContext();
+ outputFormat.setRuntimeContext(ctx);
+ outputFormat.open(ctx.getIndexOfThisSubtask(),
ctx.getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void invoke(T value, Context context) throws IOException {
+ outputFormat.writeRecord(value);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) {
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ outputFormat.flush();
+ }
+
+ @Override
+ public void close() {
+ outputFormat.close();
+ }
+
+ @Override
+ public void setInputType(TypeInformation<?> type, ExecutionConfig
executionConfig) {
+ outputFormat.setInputType(type, executionConfig);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
new file mode 100644
index 0000000000..835ed77e00
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.function.SerializableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static
org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A JDBC outputFormat that supports batching records before writing records
to database.
+ * Modify from {@link
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
+ * */
+@Internal
+public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends
JdbcBatchStatementExecutor<JdbcIn>>
+ extends
+ RichOutputFormat<In>
+ implements
+ Flushable,
+ InputTypeConfigurable {
+
+ protected final JdbcConnectionProvider connectionProvider;
+ @Nullable
+ private TypeSerializer<In> serializer;
+
+ // audit
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+ private final String auditKeys;
+ private SinkMetricData sinkMetricData;
+ private Long rowCount = 0L;
+ private Long dataSize = 0L;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setInputType(TypeInformation<?> type, ExecutionConfig
executionConfig) {
+ if (executionConfig.isObjectReuseEnabled()) {
+ this.serializer = (TypeSerializer<In>)
type.createSerializer(executionConfig);
+ }
+ }
+
+ /**
+ * An interface to extract a value from given argument.
+ *
+ * @param <F> The type of given argument
+ * @param <T> The type of the return value
+ */
+ public interface RecordExtractor<F, T> extends Function<F, T>,
Serializable {
+
+ static <T> JdbcOutputFormat.RecordExtractor<T, T> identity() {
+ return x -> x;
+ }
+ }
+
+ /**
+ * A factory for creating {@link JdbcBatchStatementExecutor} instance.
+ *
+ * @param <T> The type of instance.
+ */
+ public interface StatementExecutorFactory<T extends
JdbcBatchStatementExecutor<?>>
+ extends
+ SerializableFunction<RuntimeContext, T> {
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcOutputFormat.class);
+
+ private final JdbcExecutionOptions executionOptions;
+ private final JdbcOutputFormat.StatementExecutorFactory<JdbcExec>
statementExecutorFactory;
+ private final JdbcOutputFormat.RecordExtractor<In, JdbcIn>
jdbcRecordExtractor;
+
+ private transient JdbcExec jdbcStatementExecutor;
+ private transient int batchCount = 0;
+ private transient volatile boolean closed = false;
+
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture<?> scheduledFuture;
+ private transient volatile Exception flushException;
+
+ public JdbcOutputFormat(
+ @Nonnull JdbcConnectionProvider connectionProvider,
+ @Nonnull JdbcExecutionOptions executionOptions,
+ @Nonnull JdbcOutputFormat.StatementExecutorFactory<JdbcExec>
statementExecutorFactory,
+ @Nonnull JdbcOutputFormat.RecordExtractor<In, JdbcIn>
recordExtractor,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
+ this.connectionProvider = checkNotNull(connectionProvider);
+ this.executionOptions = checkNotNull(executionOptions);
+ this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
+ this.jdbcRecordExtractor = checkNotNull(recordExtractor);
+ // audit
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ /**
+ * Connects to the target database and initializes the prepared statement.
+ *
+ * @param taskNumber The number of the parallel instance.
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ try {
+ connectionProvider.getOrEstablishConnection();
+ } catch (Exception e) {
+ throw new IOException("unable to open JDBC writer", e);
+ }
+ // audit
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ }
+
+ jdbcStatementExecutor =
createAndOpenStatementExecutor(statementExecutorFactory);
+ if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
+ this.scheduler =
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("jdbc-upsert-output-format"));
+ this.scheduledFuture =
+ this.scheduler.scheduleWithFixedDelay(
+ () -> {
+ synchronized (JdbcOutputFormat.this) {
+ if (!closed) {
+ try {
+ flush();
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }
+ },
+ executionOptions.getBatchIntervalMs(),
+ executionOptions.getBatchIntervalMs(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private JdbcExec createAndOpenStatementExecutor(
+ JdbcOutputFormat.StatementExecutorFactory<JdbcExec>
statementExecutorFactory) throws IOException {
+ JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
+ try {
+ exec.prepareStatements(connectionProvider.getConnection());
+ } catch (SQLException e) {
+ throw new IOException("unable to open JDBC writer", e);
+ }
+ return exec;
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing records to JDBC failed.",
flushException);
+ }
+ }
+
+ @Override
+ public final synchronized void writeRecord(In record) throws IOException {
+ checkFlushException();
+ updateMetric(record);
+ try {
+ In recordCopy = copyIfNecessary(record);
+ addToBatch(record, jdbcRecordExtractor.apply(recordCopy));
+ batchCount++;
+ if (executionOptions.getBatchSize() > 0
+ && batchCount >= executionOptions.getBatchSize()) {
+ flush();
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowCount, dataSize);
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Writing records to JDBC failed.", e);
+ }
+ }
+
+ private void updateMetric(In record) {
+ rowCount++;
+ dataSize += CalculateObjectSizeUtils.getDataSize(record);
+ }
+
+ private In copyIfNecessary(In record) {
+ return serializer == null ? record : serializer.copy(record);
+ }
+
+ protected void addToBatch(In original, JdbcIn extracted) throws
SQLException {
+ jdbcStatementExecutor.addToBatch(extracted);
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ checkFlushException();
+
+ for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
+ try {
+ attemptFlush();
+ batchCount = 0;
+ break;
+ } catch (SQLException e) {
+ LOG.error("JDBC executeBatch error, retry times = {}", i, e);
+ if (i >= executionOptions.getMaxRetries()) {
+ throw new IOException(e);
+ }
+ try {
+ if (!connectionProvider.isConnectionValid()) {
+ updateExecutor(true);
+ }
+ } catch (Exception exception) {
+ LOG.error(
+ "JDBC connection is not valid, and reestablish
connection failed.",
+ exception);
+ throw new IOException("Reestablish JDBC connection
failed", exception);
+ }
+ try {
+ Thread.sleep(1000 * i);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "unable to flush; interrupted while doing another
attempt", e);
+ }
+ }
+ }
+ // audit, flush metrics data
+ if (sinkMetricData != null) {
+ sinkMetricData.flushAuditData();
+ }
+ }
+
+ protected void attemptFlush() throws SQLException {
+ jdbcStatementExecutor.executeBatch();
+ }
+
+ /** Executes prepared statement and closes all resources of this instance.
*/
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+
+ if (batchCount > 0) {
+ try {
+ flush();
+ } catch (Exception e) {
+ LOG.warn("Writing records to JDBC failed.", e);
+ throw new RuntimeException("Writing records to JDBC
failed.", e);
+ }
+ }
+
+ // audit, flush metrics data
+ if (sinkMetricData != null) {
+ sinkMetricData.flushAuditData();
+ }
+
+ try {
+ if (jdbcStatementExecutor != null) {
+ jdbcStatementExecutor.closeStatements();
+ }
+ } catch (SQLException e) {
+ LOG.warn("Close JDBC writer failed.", e);
+ }
+ }
+ connectionProvider.closeConnection();
+ checkFlushException();
+ }
+
+ public static JdbcOutputFormat.Builder builder() {
+ return new JdbcOutputFormat.Builder();
+ }
+
+ /** Builder for a {@link JdbcOutputFormat}. */
+ public static class Builder {
+
+ private InternalJdbcConnectionOptions options;
+ private String[] fieldNames;
+ private String[] keyFields;
+ private int[] fieldTypes;
+
+ // audit
+ private String inlongMetric;
+ private String auditHostAndPorts;
+ private String auditKeys;
+
+ private JdbcExecutionOptions.Builder executionOptionsBuilder =
+ JdbcExecutionOptions.builder();
+
+ /** required, jdbc options. */
+ public JdbcOutputFormat.Builder
setOptions(InternalJdbcConnectionOptions options) {
+ this.options = options;
+ return this;
+ }
+
+ /** required, field names of this jdbc sink. */
+ public JdbcOutputFormat.Builder setFieldNames(String[] fieldNames) {
+ this.fieldNames = fieldNames;
+ return this;
+ }
+
+ /** required, upsert unique keys. */
+ public JdbcOutputFormat.Builder setKeyFields(String[] keyFields) {
+ this.keyFields = keyFields;
+ return this;
+ }
+
+ /** required, field types of this jdbc sink. */
+ public JdbcOutputFormat.Builder setFieldTypes(int[] fieldTypes) {
+ this.fieldTypes = fieldTypes;
+ return this;
+ }
+
+ /**
+ * optional, flush max size (includes all append, upsert and delete
records), over this
+ * number of records, will flush data.
+ */
+ public JdbcOutputFormat.Builder setFlushMaxSize(int flushMaxSize) {
+ executionOptionsBuilder.withBatchSize(flushMaxSize);
+ return this;
+ }
+
+ /** optional, flush interval mills, over this time, asynchronous
threads will flush data. */
+ public JdbcOutputFormat.Builder setFlushIntervalMills(long
flushIntervalMills) {
+ executionOptionsBuilder.withBatchIntervalMs(flushIntervalMills);
+ return this;
+ }
+
+ /** optional, max retry times for jdbc connector. */
+ public JdbcOutputFormat.Builder setMaxRetryTimes(int maxRetryTimes) {
+ executionOptionsBuilder.withMaxRetries(maxRetryTimes);
+ return this;
+ }
+
+ public JdbcOutputFormat.Builder setInlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
+ return this;
+ }
+
+ public JdbcOutputFormat.Builder setAuditHostAndPorts(String
auditHostAndPorts) {
+ this.auditHostAndPorts = auditHostAndPorts;
+ return this;
+ }
+
+ public JdbcOutputFormat.Builder setAuditKeys(String auditKeys) {
+ this.auditKeys = auditKeys;
+ return this;
+ }
+
+ /**
+ * Finalizes the configuration and checks validity.
+ *
+ * @return Configured JdbcUpsertOutputFormat
+ */
+ public JdbcOutputFormat<Tuple2<Boolean, Row>, Row,
JdbcBatchStatementExecutor<Row>> build() {
+ checkNotNull(options, "No options supplied.");
+ checkNotNull(fieldNames, "No fieldNames supplied.");
+ JdbcDmlOptions dml =
+ JdbcDmlOptions.builder()
+ .withTableName(options.getTableName())
+ .withDialect(options.getDialect())
+ .withFieldNames(fieldNames)
+ .withKeyFields(keyFields)
+ .withFieldTypes(fieldTypes)
+ .build();
+ if (dml.getKeyFields().isPresent() &&
dml.getKeyFields().get().length > 0) {
+ return new TableJdbcUpsertOutputFormat(
+ new SimpleJdbcConnectionProvider(options),
+ dml,
+ executionOptionsBuilder.build(),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ } else {
+ // warn: don't close over builder fields
+ String sql =
+ FieldNamedPreparedStatementImpl.parseNamedStatement(
+ options.getDialect()
+ .getInsertIntoStatement(
+ dml.getTableName(),
dml.getFieldNames()),
+ new HashMap<>());
+ return new JdbcOutputFormat<>(
+ new SimpleJdbcConnectionProvider(options),
+ executionOptionsBuilder.build(),
+ ctx -> createSimpleRowExecutor(
+ sql,
+ dml.getFieldTypes(),
+
ctx.getExecutionConfig().isObjectReuseEnabled()),
+ tuple2 -> {
+ Preconditions.checkArgument(tuple2.f0);
+ return tuple2.f1;
+ },
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ }
+ }
+ }
+
+ static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor(
+ String sql, int[] fieldTypes, boolean objectReuse) {
+ return JdbcBatchStatementExecutor.simple(
+ sql,
+ createRowJdbcStatementBuilder(fieldTypes),
+ objectReuse ? Row::copy : Function.identity());
+ }
+
+ /**
+ * Creates a {@link JdbcStatementBuilder} for {@link Row} using the
provided SQL types array.
+ * Uses {@link JdbcUtils#setRecordToStatement}
+ */
+ static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[]
types) {
+ return (st, record) -> setRecordToStatement(st, types, record);
+ }
+
+ public void updateExecutor(boolean reconnect) throws SQLException,
ClassNotFoundException {
+ jdbcStatementExecutor.closeStatements();
+ jdbcStatementExecutor.prepareStatements(
+ reconnect
+ ? connectionProvider.reestablishConnection()
+ : connectionProvider.getConnection());
+ }
+
+ /** Returns configured {@code JdbcExecutionOptions}. */
+ public JdbcExecutionOptions getExecutionOptions() {
+ return executionOptions;
+ }
+
+ @VisibleForTesting
+ public Connection getConnection() {
+ return connectionProvider.getConnection();
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
new file mode 100644
index 0000000000..5c5ff2978c
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getPrimaryKey;
+import static
org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Upsert jdbc output format.
+ * Modify from {@link
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
+ * */
+
+class TableJdbcUpsertOutputFormat
+ extends
+ JdbcOutputFormat<Tuple2<Boolean, Row>, Row,
JdbcBatchStatementExecutor<Row>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TableJdbcUpsertOutputFormat.class);
+
+ private JdbcBatchStatementExecutor<Row> deleteExecutor;
+ private final StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
deleteStatementExecutorFactory;
+
+ // audit
+ private String inlongMetric;
+ private String auditHostAndPorts;
+ private String auditKeys;
+
+ TableJdbcUpsertOutputFormat(
+ JdbcConnectionProvider connectionProvider,
+ JdbcDmlOptions dmlOptions,
+ JdbcExecutionOptions batchOptions,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
+ this(
+ connectionProvider,
+ batchOptions,
+ ctx -> createUpsertRowExecutor(dmlOptions, ctx),
+ ctx -> createDeleteExecutor(dmlOptions, ctx),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ }
+
+ @VisibleForTesting
+ TableJdbcUpsertOutputFormat(
+ JdbcConnectionProvider connectionProvider,
+ JdbcExecutionOptions batchOptions,
+ StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
statementExecutorFactory,
+ StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
deleteStatementExecutorFactory,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
+ super(connectionProvider, batchOptions, statementExecutorFactory,
tuple2 -> tuple2.f1,
+ inlongMetric, auditHostAndPorts, auditKeys);
+ this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ super.open(taskNumber, numTasks);
+ deleteExecutor =
deleteStatementExecutorFactory.apply(getRuntimeContext());
+ try {
+
deleteExecutor.prepareStatements(connectionProvider.getConnection());
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static JdbcBatchStatementExecutor<Row> createDeleteExecutor(
+ JdbcDmlOptions dmlOptions, RuntimeContext ctx) {
+ int[] pkFields =
+ Arrays.stream(dmlOptions.getFieldNames())
+
.mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf)
+ .toArray();
+ int[] pkTypes =
+ dmlOptions.getFieldTypes() == null
+ ? null
+ : Arrays.stream(pkFields).map(f ->
dmlOptions.getFieldTypes()[f]).toArray();
+ String deleteSql =
+ FieldNamedPreparedStatementImpl.parseNamedStatement(
+ dmlOptions
+ .getDialect()
+ .getDeleteStatement(
+ dmlOptions.getTableName(),
dmlOptions.getFieldNames()),
+ new HashMap<>());
+ return createKeyedRowExecutor(pkFields, pkTypes, deleteSql);
+ }
+
+ @Override
+ protected void addToBatch(Tuple2<Boolean, Row> original, Row extracted)
throws SQLException {
+ if (original.f0) {
+ super.addToBatch(original, extracted);
+ } else {
+ deleteExecutor.addToBatch(extracted);
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ try {
+ super.close();
+ } finally {
+ try {
+ if (deleteExecutor != null) {
+ deleteExecutor.closeStatements();
+ }
+ } catch (SQLException e) {
+ LOG.warn("unable to close delete statement runner", e);
+ }
+ }
+ }
+
+ @Override
+ protected void attemptFlush() throws SQLException {
+ super.attemptFlush();
+ deleteExecutor.executeBatch();
+ }
+
+ @Override
+ public void updateExecutor(boolean reconnect) throws SQLException,
ClassNotFoundException {
+ super.updateExecutor(reconnect);
+ deleteExecutor.closeStatements();
+ deleteExecutor.prepareStatements(connectionProvider.getConnection());
+ }
+
+ private static JdbcBatchStatementExecutor<Row> createKeyedRowExecutor(
+ int[] pkFields, int[] pkTypes, String sql) {
+ return JdbcBatchStatementExecutor.keyed(
+ sql,
+ createRowKeyExtractor(pkFields),
+ (st, record) -> setRecordToStatement(
+ st, pkTypes,
createRowKeyExtractor(pkFields).apply(record)));
+ }
+
+ private static JdbcBatchStatementExecutor<Row> createUpsertRowExecutor(
+ JdbcDmlOptions opt, RuntimeContext ctx) {
+ checkArgument(opt.getKeyFields().isPresent());
+
+ int[] pkFields =
+ Arrays.stream(opt.getKeyFields().get())
+ .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
+ .toArray();
+ int[] pkTypes =
+ opt.getFieldTypes() == null
+ ? null
+ : Arrays.stream(pkFields).map(f ->
opt.getFieldTypes()[f]).toArray();
+
+ return opt.getDialect()
+ .getUpsertStatement(
+ opt.getTableName(), opt.getFieldNames(),
opt.getKeyFields().get())
+ .map(
+ sql -> createSimpleRowExecutor(
+ parseNamedStatement(sql),
+ opt.getFieldTypes(),
+
ctx.getExecutionConfig().isObjectReuseEnabled()))
+ .orElseGet(
+ () -> new InsertOrUpdateJdbcExecutor<>(
+ parseNamedStatement(
+ opt.getDialect()
+ .getRowExistsStatement(
+ opt.getTableName(),
+
opt.getKeyFields().get())),
+ parseNamedStatement(
+ opt.getDialect()
+ .getInsertIntoStatement(
+ opt.getTableName(),
+ opt.getFieldNames())),
+ parseNamedStatement(
+ opt.getDialect()
+ .getUpdateStatement(
+ opt.getTableName(),
+ opt.getFieldNames(),
+
opt.getKeyFields().get())),
+ createRowJdbcStatementBuilder(pkTypes),
+
createRowJdbcStatementBuilder(opt.getFieldTypes()),
+
createRowJdbcStatementBuilder(opt.getFieldTypes()),
+ createRowKeyExtractor(pkFields),
+ ctx.getExecutionConfig().isObjectReuseEnabled()
+ ? Row::copy
+ : Function.identity()));
+ }
+
+ private static String parseNamedStatement(String statement) {
+ return FieldNamedPreparedStatementImpl.parseNamedStatement(statement,
new HashMap<>());
+ }
+
+ private static Function<Row, Row> createRowKeyExtractor(int[] pkFields) {
+ return row -> getPrimaryKey(row, pkFields);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
new file mode 100644
index 0000000000..866d498133
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.DRIVER;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_TTL;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_MAX_RETRIES;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.MAX_RETRY_TIMEOUT;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_AUTO_COMMIT;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_FETCH_SIZE;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_COLUMN;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_LOWER_BOUND;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_NUM;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_UPPER_BOUND;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_MAX_RETRIES;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_PARALLELISM;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource}
and {@link
+ * JdbcDynamicTableSink}.
+ * Modify from {@link
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory}
+ */
+@Internal
+public class JdbcDynamicTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "jdbc-inlong";
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ final ReadableConfig config = helper.getOptions();
+
+ helper.validate();
+ validateConfigOptions(config, context.getClassLoader());
+ validateDataTypeWithJdbcDialect(
+ context.getPhysicalRowDataType(), config.get(URL),
context.getClassLoader());
+ InternalJdbcConnectionOptions jdbcOptions =
+ getJdbcOptions(config, context.getClassLoader());
+
+ // inlong audit
+ String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts =
config.getOptional(INLONG_AUDIT).orElse(null);
+ String auditKeys = config.getOptional(AUDIT_KEYS).orElse(null);
+
+ return new JdbcDynamicTableSink(
+ jdbcOptions,
+ getJdbcExecutionOptions(config),
+ getJdbcDmlOptions(
+ jdbcOptions,
+ context.getPhysicalRowDataType(),
+ context.getPrimaryKeyIndexes()),
+ context.getPhysicalRowDataType(),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ final ReadableConfig config = helper.getOptions();
+
+ helper.validate();
+ validateConfigOptions(config, context.getClassLoader());
+ validateDataTypeWithJdbcDialect(
+ context.getPhysicalRowDataType(), config.get(URL),
context.getClassLoader());
+
+ return new JdbcDynamicTableSource(
+ getJdbcOptions(helper.getOptions(), context.getClassLoader()),
+ getJdbcReadOptions(helper.getOptions()),
+ helper.getOptions().get(LookupOptions.MAX_RETRIES),
+ getLookupCache(config),
+ context.getPhysicalRowDataType());
+ }
+
+ private static void validateDataTypeWithJdbcDialect(
+ DataType dataType, String url, ClassLoader classLoader) {
+ final JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader);
+ dialect.validate((RowType) dataType.getLogicalType());
+ }
+
+ private InternalJdbcConnectionOptions getJdbcOptions(
+ ReadableConfig readableConfig, ClassLoader classLoader) {
+ final String url = readableConfig.get(URL);
+ final InternalJdbcConnectionOptions.Builder builder =
+ InternalJdbcConnectionOptions.builder()
+ .setClassLoader(classLoader)
+ .setDBUrl(url)
+ .setTableName(readableConfig.get(TABLE_NAME))
+ .setDialect(JdbcDialectLoader.load(url, classLoader))
+
.setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null))
+ .setConnectionCheckTimeoutSeconds(
+ (int)
readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
+
+ readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
+ readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+ readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+ return builder.build();
+ }
+
+ private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) {
+ final Optional<String> partitionColumnName =
+ readableConfig.getOptional(SCAN_PARTITION_COLUMN);
+ final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
+ if (partitionColumnName.isPresent()) {
+ builder.setPartitionColumnName(partitionColumnName.get());
+
builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
+
builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
+ builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM));
+ }
+
readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize);
+ builder.setAutoCommit(readableConfig.get(SCAN_AUTO_COMMIT));
+ return builder.build();
+ }
+
+ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig
config) {
+ final JdbcExecutionOptions.Builder builder = new
JdbcExecutionOptions.Builder();
+ builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+
builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+ builder.withMaxRetries(config.get(SINK_MAX_RETRIES));
+ return builder.build();
+ }
+
+ private JdbcDmlOptions getJdbcDmlOptions(
+ InternalJdbcConnectionOptions jdbcOptions, DataType dataType,
int[] primaryKeyIndexes) {
+
+ String[] keyFields =
+ Arrays.stream(primaryKeyIndexes)
+ .mapToObj(i -> DataType.getFieldNames(dataType).get(i))
+ .toArray(String[]::new);
+
+ return JdbcDmlOptions.builder()
+ .withTableName(jdbcOptions.getTableName())
+ .withDialect(jdbcOptions.getDialect())
+ .withFieldNames(DataType.getFieldNames(dataType).toArray(new
String[0]))
+ .withKeyFields(keyFields.length > 0 ? keyFields : null)
+ .build();
+ }
+
+ @Nullable
+ private LookupCache getLookupCache(ReadableConfig tableOptions) {
+ LookupCache cache = null;
+ // Legacy cache options
+ if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
+ && tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO)
> 0) {
+ cache =
+ DefaultLookupCache.newBuilder()
+
.maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
+
.expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
+
.cacheMissingKey(tableOptions.get(LOOKUP_CACHE_MISSING_KEY))
+ .build();
+ }
+ if (tableOptions
+ .get(LookupOptions.CACHE_TYPE)
+ .equals(LookupOptions.LookupCacheType.PARTIAL)) {
+ cache = DefaultLookupCache.fromConfig(tableOptions);
+ }
+ return cache;
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+ requiredOptions.add(URL);
+ requiredOptions.add(TABLE_NAME);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+ optionalOptions.add(DRIVER);
+ optionalOptions.add(USERNAME);
+ optionalOptions.add(PASSWORD);
+ optionalOptions.add(SCAN_PARTITION_COLUMN);
+ optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
+ optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
+ optionalOptions.add(SCAN_PARTITION_NUM);
+ optionalOptions.add(SCAN_FETCH_SIZE);
+ optionalOptions.add(SCAN_AUTO_COMMIT);
+ optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
+ optionalOptions.add(LOOKUP_CACHE_TTL);
+ optionalOptions.add(LOOKUP_MAX_RETRIES);
+ optionalOptions.add(LOOKUP_CACHE_MISSING_KEY);
+ optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+ optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
+ optionalOptions.add(SINK_MAX_RETRIES);
+ optionalOptions.add(SINK_PARALLELISM);
+ optionalOptions.add(MAX_RETRY_TIMEOUT);
+ optionalOptions.add(LookupOptions.CACHE_TYPE);
+ optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
+ optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
+ optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
+ optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
+ optionalOptions.add(LookupOptions.MAX_RETRIES);
+ optionalOptions.add(AUDIT_KEYS);
+ optionalOptions.add(INLONG_METRIC);
+ optionalOptions.add(INLONG_AUDIT);
+ return optionalOptions;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> forwardOptions() {
+ return Stream.of(
+ URL,
+ TABLE_NAME,
+ USERNAME,
+ PASSWORD,
+ DRIVER,
+ SINK_BUFFER_FLUSH_MAX_ROWS,
+ SINK_BUFFER_FLUSH_INTERVAL,
+ SINK_MAX_RETRIES,
+ MAX_RETRY_TIMEOUT,
+ SCAN_FETCH_SIZE,
+ SCAN_AUTO_COMMIT)
+ .collect(Collectors.toSet());
+ }
+
+ private void validateConfigOptions(ReadableConfig config, ClassLoader
classLoader) {
+ String jdbcUrl = config.get(URL);
+ JdbcDialectLoader.load(jdbcUrl, classLoader);
+
+ checkAllOrNone(config, new ConfigOption[]{USERNAME, PASSWORD});
+
+ checkAllOrNone(
+ config,
+ new ConfigOption[]{
+ SCAN_PARTITION_COLUMN,
+ SCAN_PARTITION_NUM,
+ SCAN_PARTITION_LOWER_BOUND,
+ SCAN_PARTITION_UPPER_BOUND
+ });
+
+ if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
+ && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent())
{
+ long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
+ long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND);
+ if (lowerBound > upperBound) {
+ throw new IllegalArgumentException(
+ String.format(
+ "'%s'='%s' must not be larger than '%s'='%s'.",
+ SCAN_PARTITION_LOWER_BOUND.key(),
+ lowerBound,
+ SCAN_PARTITION_UPPER_BOUND.key(),
+ upperBound));
+ }
+ }
+
+ checkAllOrNone(config, new ConfigOption[]{LOOKUP_CACHE_MAX_ROWS,
LOOKUP_CACHE_TTL});
+
+ if (config.get(LOOKUP_MAX_RETRIES) < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option shouldn't be negative,
but is %s.",
+ LOOKUP_MAX_RETRIES.key(),
config.get(LOOKUP_MAX_RETRIES)));
+ }
+
+ if (config.get(SINK_MAX_RETRIES) < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option shouldn't be negative,
but is %s.",
+ SINK_MAX_RETRIES.key(),
config.get(SINK_MAX_RETRIES)));
+ }
+
+ if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option must be in second
granularity and shouldn't be smaller than 1 second, but is %s.",
+ MAX_RETRY_TIMEOUT.key(),
+ config.get(
+ ConfigOptions.key(MAX_RETRY_TIMEOUT.key())
+ .stringType()
+ .noDefaultValue())));
+ }
+ }
+
+ private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[]
configOptions) {
+ int presentCount = 0;
+ for (ConfigOption configOption : configOptions) {
+ if (config.getOptional(configOption).isPresent()) {
+ presentCount++;
+ }
+ }
+ String[] propertyNames =
+
Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
+ Preconditions.checkArgument(
+ configOptions.length == presentCount || presentCount == 0,
+ "Either all or none of the following options should be
provided:\n"
+ + String.join("\n", propertyNames));
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
new file mode 100644
index 0000000000..6721130414
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.inlong.sort.jdbc.internal.GenericJdbcSinkFunction;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A {@link DynamicTableSink} for JDBC.
+ * Modify from {@link
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink}
+ * */
+@Internal
+public class JdbcDynamicTableSink implements DynamicTableSink {
+
+ private final InternalJdbcConnectionOptions jdbcOptions;
+ private final JdbcExecutionOptions executionOptions;
+ private final JdbcDmlOptions dmlOptions;
+ private final DataType physicalRowDataType;
+ private final String dialectName;
+
+ // audit
+ String inlongMetric;
+ String auditHostAndPorts;
+ String auditKeys;
+
+ public JdbcDynamicTableSink(
+ InternalJdbcConnectionOptions jdbcOptions,
+ JdbcExecutionOptions executionOptions,
+ JdbcDmlOptions dmlOptions,
+ DataType physicalRowDataType,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
+ this.jdbcOptions = jdbcOptions;
+ this.executionOptions = executionOptions;
+ this.dmlOptions = dmlOptions;
+ this.physicalRowDataType = physicalRowDataType;
+ this.dialectName = dmlOptions.getDialect().dialectName();
+ // audit
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ validatePrimaryKey(requestedMode);
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.DELETE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .build();
+ }
+
+ private void validatePrimaryKey(ChangelogMode requestedMode) {
+ checkState(
+ ChangelogMode.insertOnly().equals(requestedMode)
+ || dmlOptions.getKeyFields().isPresent(),
+ "please declare primary key for sink table when query contains
update/delete record.");
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final TypeInformation<RowData> rowDataTypeInformation =
+ context.createTypeInformation(physicalRowDataType);
+ final JdbcOutputFormatBuilder builder = new JdbcOutputFormatBuilder();
+
+ builder.setJdbcOptions(jdbcOptions);
+ builder.setJdbcDmlOptions(dmlOptions);
+ builder.setJdbcExecutionOptions(executionOptions);
+ builder.setRowDataTypeInfo(rowDataTypeInformation);
+ builder.setFieldDataTypes(
+ DataType.getFieldDataTypes(physicalRowDataType).toArray(new
DataType[0]));
+ // audit
+ builder.setInlongMetric(inlongMetric);
+ builder.setAuditHostAndPorts(auditHostAndPorts);
+ builder.setAuditKeys(auditKeys);
+ return SinkFunctionProvider.of(
+ new GenericJdbcSinkFunction<>(builder.build()),
jdbcOptions.getParallelism());
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new JdbcDynamicTableSink(
+ jdbcOptions, executionOptions, dmlOptions, physicalRowDataType,
+ inlongMetric, auditHostAndPorts, auditKeys);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "JDBC:" + dialectName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JdbcDynamicTableSink)) {
+ return false;
+ }
+ JdbcDynamicTableSink that = (JdbcDynamicTableSink) o;
+ return Objects.equals(jdbcOptions, that.jdbcOptions)
+ && Objects.equals(executionOptions, that.executionOptions)
+ && Objects.equals(dmlOptions, that.dmlOptions)
+ && Objects.equals(physicalRowDataType,
that.physicalRowDataType)
+ && Objects.equals(dialectName, that.dialectName)
+ && Objects.equals(inlongMetric, that.inlongMetric)
+ && Objects.equals(auditHostAndPorts, that.auditHostAndPorts)
+ && Objects.equals(auditKeys, that.auditKeys);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ jdbcOptions, executionOptions, dmlOptions,
physicalRowDataType, dialectName,
+ inlongMetric, auditHostAndPorts, auditKeys);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
new file mode 100644
index 0000000000..436b763ee5
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.inlong.sort.jdbc.internal.JdbcOutputFormat;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Builder for {@link JdbcOutputFormat} for Table/SQL.
+ * Modify from {@link
org.apache.flink.connector.jdbc.table.JdbcOutputFormatBuilder}
+ * */
+public class JdbcOutputFormatBuilder implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private InternalJdbcConnectionOptions jdbcOptions;
+ private JdbcExecutionOptions executionOptions;
+ private JdbcDmlOptions dmlOptions;
+ private TypeInformation<RowData> rowDataTypeInformation;
+ private DataType[] fieldDataTypes;
+
+ // audit
+ String inlongMetric;
+ String auditHostAndPorts;
+ String auditKeys;
+
+ public JdbcOutputFormatBuilder() {
+ }
+
+ public JdbcOutputFormatBuilder
setJdbcOptions(InternalJdbcConnectionOptions jdbcOptions) {
+ this.jdbcOptions = jdbcOptions;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder
setJdbcExecutionOptions(JdbcExecutionOptions executionOptions) {
+ this.executionOptions = executionOptions;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setJdbcDmlOptions(JdbcDmlOptions
dmlOptions) {
+ this.dmlOptions = dmlOptions;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setRowDataTypeInfo(TypeInformation<RowData>
rowDataTypeInfo) {
+ this.rowDataTypeInformation = rowDataTypeInfo;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setFieldDataTypes(DataType[]
fieldDataTypes) {
+ this.fieldDataTypes = fieldDataTypes;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setInlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setAuditHostAndPorts(String
auditHostAndPorts) {
+ this.auditHostAndPorts = auditHostAndPorts;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setAuditKeys(String auditKeys) {
+ this.auditKeys = auditKeys;
+ return this;
+ }
+
+ public JdbcOutputFormat<RowData, ?, ?> build() {
+ checkNotNull(jdbcOptions, "jdbc options can not be null");
+ checkNotNull(dmlOptions, "jdbc dml options can not be null");
+ checkNotNull(executionOptions, "jdbc execution options can not be
null");
+
+ final LogicalType[] logicalTypes =
+ Arrays.stream(fieldDataTypes)
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
+ if (dmlOptions.getKeyFields().isPresent() &&
dmlOptions.getKeyFields().get().length > 0) {
+ // upsert query
+ return new JdbcOutputFormat<>(
+ new SimpleJdbcConnectionProvider(jdbcOptions),
+ executionOptions,
+ ctx -> createBufferReduceExecutor(
+ dmlOptions, ctx, rowDataTypeInformation,
logicalTypes),
+ JdbcOutputFormat.RecordExtractor.identity(),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ } else {
+ // append only query
+ final String sql =
+ dmlOptions
+ .getDialect()
+ .getInsertIntoStatement(
+ dmlOptions.getTableName(),
dmlOptions.getFieldNames());
+ return new JdbcOutputFormat<>(
+ new SimpleJdbcConnectionProvider(jdbcOptions),
+ executionOptions,
+ ctx -> createSimpleBufferedExecutor(
+ ctx,
+ dmlOptions.getDialect(),
+ dmlOptions.getFieldNames(),
+ logicalTypes,
+ sql,
+ rowDataTypeInformation),
+ JdbcOutputFormat.RecordExtractor.identity(),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ }
+ }
+
+ private static JdbcBatchStatementExecutor<RowData>
createBufferReduceExecutor(
+ JdbcDmlOptions opt,
+ RuntimeContext ctx,
+ TypeInformation<RowData> rowDataTypeInfo,
+ LogicalType[] fieldTypes) {
+ checkArgument(opt.getKeyFields().isPresent());
+ JdbcDialect dialect = opt.getDialect();
+ String tableName = opt.getTableName();
+ String[] pkNames = opt.getKeyFields().get();
+ int[] pkFields =
+ Arrays.stream(pkNames)
+ .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
+ .toArray();
+ LogicalType[] pkTypes =
+ Arrays.stream(pkFields).mapToObj(f ->
fieldTypes[f]).toArray(LogicalType[]::new);
+ final TypeSerializer<RowData> typeSerializer =
+ rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+ final Function<RowData, RowData> valueTransform =
+ ctx.getExecutionConfig().isObjectReuseEnabled()
+ ? typeSerializer::copy
+ : Function.identity();
+
+ return new TableBufferReducedStatementExecutor(
+ createUpsertRowExecutor(
+ dialect,
+ tableName,
+ opt.getFieldNames(),
+ fieldTypes,
+ pkFields,
+ pkNames,
+ pkTypes),
+ createDeleteExecutor(dialect, tableName, pkNames, pkTypes),
+ createRowKeyExtractor(fieldTypes, pkFields),
+ valueTransform);
+ }
+
+ private static JdbcBatchStatementExecutor<RowData>
createSimpleBufferedExecutor(
+ RuntimeContext ctx,
+ JdbcDialect dialect,
+ String[] fieldNames,
+ LogicalType[] fieldTypes,
+ String sql,
+ TypeInformation<RowData> rowDataTypeInfo) {
+ final TypeSerializer<RowData> typeSerializer =
+ rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+ return new TableBufferedStatementExecutor(
+ createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql),
+ ctx.getExecutionConfig().isObjectReuseEnabled()
+ ? typeSerializer::copy
+ : Function.identity());
+ }
+
+ private static JdbcBatchStatementExecutor<RowData> createUpsertRowExecutor(
+ JdbcDialect dialect,
+ String tableName,
+ String[] fieldNames,
+ LogicalType[] fieldTypes,
+ int[] pkFields,
+ String[] pkNames,
+ LogicalType[] pkTypes) {
+ return dialect.getUpsertStatement(tableName, fieldNames, pkNames)
+ .map(sql -> createSimpleRowExecutor(dialect, fieldNames,
fieldTypes, sql))
+ .orElseGet(
+ () -> createInsertOrUpdateExecutor(
+ dialect,
+ tableName,
+ fieldNames,
+ fieldTypes,
+ pkFields,
+ pkNames,
+ pkTypes));
+ }
+
+ private static JdbcBatchStatementExecutor<RowData> createDeleteExecutor(
+ JdbcDialect dialect, String tableName, String[] pkNames,
LogicalType[] pkTypes) {
+ String deleteSql = dialect.getDeleteStatement(tableName, pkNames);
+ return createSimpleRowExecutor(dialect, pkNames, pkTypes, deleteSql);
+ }
+
+ private static JdbcBatchStatementExecutor<RowData> createSimpleRowExecutor(
+ JdbcDialect dialect, String[] fieldNames, LogicalType[]
fieldTypes, final String sql) {
+ final JdbcRowConverter rowConverter =
dialect.getRowConverter(RowType.of(fieldTypes));
+ return new TableSimpleStatementExecutor(
+ connection ->
FieldNamedPreparedStatement.prepareStatement(connection, sql, fieldNames),
+ rowConverter);
+ }
+
+ private static JdbcBatchStatementExecutor<RowData>
createInsertOrUpdateExecutor(
+ JdbcDialect dialect,
+ String tableName,
+ String[] fieldNames,
+ LogicalType[] fieldTypes,
+ int[] pkFields,
+ String[] pkNames,
+ LogicalType[] pkTypes) {
+ final String existStmt = dialect.getRowExistsStatement(tableName,
pkNames);
+ final String insertStmt = dialect.getInsertIntoStatement(tableName,
fieldNames);
+ final String updateStmt = dialect.getUpdateStatement(tableName,
fieldNames, pkNames);
+ return new TableInsertOrUpdateStatementExecutor(
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection, existStmt, pkNames),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection, insertStmt, fieldNames),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection, updateStmt, fieldNames),
+ dialect.getRowConverter(RowType.of(pkTypes)),
+ dialect.getRowConverter(RowType.of(fieldTypes)),
+ dialect.getRowConverter(RowType.of(fieldTypes)),
+ createRowKeyExtractor(fieldTypes, pkFields));
+ }
+
+ private static Function<RowData, RowData> createRowKeyExtractor(
+ LogicalType[] logicalTypes, int[] pkFields) {
+ final RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[pkFields.length];
+ for (int i = 0; i < pkFields.length; i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes[pkFields[i]],
pkFields[i]);
+ }
+ return row -> getPrimaryKey(row, fieldGetters);
+ }
+
+ private static RowData getPrimaryKey(RowData row, RowData.FieldGetter[]
fieldGetters) {
+ GenericRowData pkRow = new GenericRowData(fieldGetters.length);
+ for (int i = 0; i < fieldGetters.length; i++) {
+ pkRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+ }
+ return pkRow;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..a14e9cc440
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.jdbc.table.JdbcDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
index a3fe402733..cf81e1a2ac 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
@@ -32,6 +32,7 @@
<modules>
<module>pulsar</module>
+ <module>jdbc</module>
</modules>
<properties>
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index bd944b384c..e0443b0fd6 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -838,6 +838,15 @@
Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note
that the software have been modified.)
License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
+ 1.3.22
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
+ Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note
that the software have been modified.)
+ License :
https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
+
=======================================================================
Apache InLong Subcomponents: