This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 64b4c8246f3e050f57df159adc9acf2f3979738a
Author: Joao Boto <[email protected]>
AuthorDate: Mon Jul 8 15:39:02 2024 +0200

    [FLINK-35367] Reorganize sinks
---
 .../connector/jdbc/core/datastream/Jdbc.java}      | 21 +++++++++---------
 .../jdbc/{ => core/datastream}/sink/JdbcSink.java  | 14 ++++++------
 .../datastream}/sink/JdbcSinkBuilder.java          |  2 +-
 .../datastream}/sink/committer/JdbcCommitable.java |  2 +-
 .../sink/committer/JdbcCommitableSerializer.java   |  2 +-
 .../datastream}/sink/committer/JdbcCommitter.java  |  4 ++--
 .../datastream}/sink/writer/JdbcWriter.java        |  4 ++--
 .../datastream}/sink/writer/JdbcWriterState.java   |  2 +-
 .../sink/writer/JdbcWriterStateSerializer.java     |  2 +-
 .../datasource/transactions/xa/XaTransaction.java  |  2 +-
 .../apache/flink/connector/jdbc/JdbcITCase.java    | 22 ++++++++++++++++---
 .../datastream}/sink/AtLeastOnceJdbcSinkTest.java  |  2 +-
 .../datastream}/sink/BaseJdbcSinkTest.java         |  6 +++---
 .../datastream}/sink/ExactlyOnceJdbcSinkTest.java  |  2 +-
 .../sink/writer/AlLeastOnceJdbcWriterTest.java     |  4 ++--
 .../sink/writer/BaseJdbcWriterTest.java            | 10 ++++++---
 .../sink/writer/ExactlyOnceJdbcWriterTest.java     |  4 ++--
 .../sink/writer/JdbcWriterStateSerializerTest.java |  7 ++++--
 .../sink/writer/JdbcWriterStateTest.java           |  7 ++++--
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java        | 25 ++++++++++++++++++++--
 .../org/apache/flink/connector/jdbc/JdbcSink.java  |  5 +++--
 .../apache/flink/connector/jdbc/sink/JdbcSink.java | 14 ++++++------
 .../flink/connector/jdbc/sink/JdbcSinkBuilder.java |  8 ++++++-
 23 files changed, 113 insertions(+), 58 deletions(-)

diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java
similarity index 55%
copy from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java
copy to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java
index c0a31ea1..2e09c5f4 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.sink;
+package org.apache.flink.connector.jdbc.core.datastream;
 
-import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
-import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
+import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder;
 
-/** Smoke tests for the {@link JdbcSink} and the underlying classes. */
-class ExactlyOnceJdbcSinkTest extends BaseJdbcSinkTest {
+/** Facade to create JDBC stream sources and sinks. */
+@PublicEvolving
+public class Jdbc {
 
-    @Override
-    protected <T> JdbcSink<T> finishSink(JdbcSinkBuilder<T> builder) {
-        return builder.withExecutionOptions(
-                        
JdbcExecutionOptions.builder().withMaxRetries(0).build())
-                .buildExactlyOnce(
-                        JdbcExactlyOnceOptions.defaults(), 
getMetadata().getXaSourceSupplier());
+    /** Create a JDBC sink builder. */
+    public static <IN> JdbcSinkBuilder<IN> sinkBuilder() {
+        return JdbcSink.builder();
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
similarity index 87%
copy from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java
copy to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
index 986fcba1..8c0cf999 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.sink;
+package org.apache.flink.connector.jdbc.core.datastream.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
@@ -25,15 +25,15 @@ import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitableSerializer;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitter;
+import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriter;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterStateSerializer;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitableSerializer;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitter;
-import org.apache.flink.connector.jdbc.sink.writer.JdbcWriter;
-import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState;
-import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterStateSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSinkBuilder.java
similarity index 98%
copy from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java
copy to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSinkBuilder.java
index ede9a00e..12c5a3e7 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSinkBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.sink;
+package org.apache.flink.connector.jdbc.core.datastream.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.base.DeliveryGuarantee;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitable.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitable.java
similarity index 93%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitable.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitable.java
index 0a04aab0..97594d5b 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitable.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitable.java
@@ -1,4 +1,4 @@
-package org.apache.flink.connector.jdbc.sink.committer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.committer;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitableSerializer.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitableSerializer.java
similarity index 94%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitableSerializer.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitableSerializer.java
index b950aced..56a4a21f 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitableSerializer.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitableSerializer.java
@@ -1,4 +1,4 @@
-package org.apache.flink.connector.jdbc.sink.committer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.committer;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitter.java
similarity index 93%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitter.java
index 1efa8133..a4576b84 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitter.java
@@ -1,14 +1,14 @@
-package org.apache.flink.connector.jdbc.sink.committer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.committer;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
-import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.io.IOException;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java
index 2e167ddb..be7aa49c 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.sink.writer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Sink;
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
@@ -32,7 +33,6 @@ import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.Transac
 import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterState.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterState.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterState.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterState.java
index 7c39827e..c5dece75 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterState.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterState.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.sink.writer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializer.java
similarity index 97%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializer.java
index 89ba5a82..b11f6b45 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializer.java
@@ -1,4 +1,4 @@
-package org.apache.flink.connector.jdbc.sink.writer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java
index 6fcc1eb6..48896842 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java
@@ -3,11 +3,11 @@ package 
org.apache.flink.connector.jdbc.datasource.transactions.xa;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState;
 import 
org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.EmptyTransactionXaException;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.TransientXaException;
-import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
index ead2c44f..3b4c60e5 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
@@ -19,8 +19,13 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
+import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
+import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.junit.jupiter.api.Test;
@@ -46,7 +51,7 @@ import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Smoke tests for the {@link JdbcSink} and the underlying classes. */
+/** Smoke tests for the {@link GenericJdbcSinkFunction} and the underlying 
classes. */
 public class JdbcITCase extends JdbcTestBase implements JdbcITCaseBase {
 
     public static final JdbcStatementBuilder<TestEntry> 
TEST_ENTRY_JDBC_STATEMENT_BUILDER =
@@ -69,7 +74,7 @@ public class JdbcITCase extends JdbcTestBase implements 
JdbcITCaseBase {
         env.setParallelism(1);
         env.fromElements(TEST_DATA)
                 .addSink(
-                        JdbcSink.sink(
+                        sink(
                                 String.format(INSERT_TEMPLATE, INPUT_TABLE),
                                 TEST_ENTRY_JDBC_STATEMENT_BUILDER,
                                 new JdbcConnectionOptionsBuilder()
@@ -100,7 +105,7 @@ public class JdbcITCase extends JdbcTestBase implements 
JdbcITCaseBase {
                             return reused;
                         })
                 .addSink(
-                        JdbcSink.sink(
+                        sink(
                                 JdbcTestFixture.INSERT_INTO_WORDS_TEMPLATE,
                                 (ps, e) -> {
                                     ps.setInt(1, counter.getAndIncrement());
@@ -172,4 +177,15 @@ public class JdbcITCase extends JdbcTestBase implements 
JdbcITCaseBase {
         T value = get.apply(rs);
         return rs.wasNull() ? null : value;
     }
+
+    private <T> SinkFunction<T> sink(
+            String sql,
+            JdbcStatementBuilder<T> statementBuilder,
+            JdbcConnectionOptions connectionOptions) {
+        return new GenericJdbcSinkFunction<>(
+                new JdbcOutputFormat<>(
+                        new SimpleJdbcConnectionProvider(connectionOptions),
+                        JdbcExecutionOptions.defaults(),
+                        () -> JdbcBatchStatementExecutor.simple(sql, 
statementBuilder)));
+    }
 }
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/AtLeastOnceJdbcSinkTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/AtLeastOnceJdbcSinkTest.java
similarity index 96%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/AtLeastOnceJdbcSinkTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/AtLeastOnceJdbcSinkTest.java
index e3a9d28d..7bf7afbc 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/AtLeastOnceJdbcSinkTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/AtLeastOnceJdbcSinkTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.sink;
+package org.apache.flink.connector.jdbc.core.datastream.sink;
 
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/BaseJdbcSinkTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
similarity index 96%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/BaseJdbcSinkTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
index 7224647a..219b374c 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/BaseJdbcSinkTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.sink;
+package org.apache.flink.connector.jdbc.core.datastream.sink;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
@@ -37,8 +37,8 @@ import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Smoke tests for the {@link org.apache.flink.connector.jdbc.sink.JdbcSink} 
and the underlying
- * classes.
+ * Smoke tests for the {@link 
org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink} and the
+ * underlying classes.
  */
 public abstract class BaseJdbcSinkTest implements DerbyTestBase {
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/ExactlyOnceJdbcSinkTest.java
similarity index 95%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/ExactlyOnceJdbcSinkTest.java
index c0a31ea1..f117d0f6 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/ExactlyOnceJdbcSinkTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.sink;
+package org.apache.flink.connector.jdbc.core.datastream.sink;
 
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/AlLeastOnceJdbcWriterTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/AlLeastOnceJdbcWriterTest.java
similarity index 94%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/AlLeastOnceJdbcWriterTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/AlLeastOnceJdbcWriterTest.java
index b22b1b1f..59fb1e4a 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/AlLeastOnceJdbcWriterTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/AlLeastOnceJdbcWriterTest.java
@@ -1,11 +1,11 @@
-package org.apache.flink.connector.jdbc.sink.writer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
 import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
 
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/BaseJdbcWriterTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java
similarity index 94%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/BaseJdbcWriterTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java
index 914a2fbf..cfcc2030 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/BaseJdbcWriterTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java
@@ -1,4 +1,4 @@
-package org.apache.flink.connector.jdbc.sink.writer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.connector.sink2.Sink;
@@ -6,12 +6,12 @@ import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
 import 
org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
 import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
 import org.apache.flink.connector.jdbc.testutils.TableManaged;
 import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
 import org.apache.flink.util.StringUtils;
@@ -29,7 +29,11 @@ import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doReturn;
 
-/** Base smoke tests for the {@link JdbcWriter} and the underlying classes. */
+/**
+ * Base smoke tests for the {@link
+ * org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriter} and 
the underlying
+ * classes.
+ */
 abstract class BaseJdbcWriterTest implements DerbyTestBase {
 
     private static final String JOBID = "6b64d8a9a951e2e8767ae952ad951706";
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/ExactlyOnceJdbcWriterTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/ExactlyOnceJdbcWriterTest.java
similarity index 95%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/ExactlyOnceJdbcWriterTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/ExactlyOnceJdbcWriterTest.java
index ab0153d2..409879a2 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/ExactlyOnceJdbcWriterTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/ExactlyOnceJdbcWriterTest.java
@@ -1,11 +1,11 @@
-package org.apache.flink.connector.jdbc.sink.writer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
 import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
 
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializerTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializerTest.java
similarity index 85%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializerTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializerTest.java
index 7b8c8e91..8bd14f84 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializerTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializerTest.java
@@ -1,4 +1,4 @@
-package org.apache.flink.connector.jdbc.sink.writer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
@@ -10,7 +10,10 @@ import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Smoke test for {@link JdbcWriterStateSerializer}. */
+/**
+ * Smoke test for {@link
+ * 
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterStateSerializer}.
+ */
 class JdbcWriterStateSerializerTest {
 
     @Test
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateTest.java
similarity index 86%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateTest.java
index f454f8be..889a7eed 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateTest.java
@@ -1,4 +1,4 @@
-package org.apache.flink.connector.jdbc.sink.writer;
+package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
@@ -9,7 +9,10 @@ import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Smoke test for {@link JdbcWriterState}. */
+/**
+ * Smoke test for {@link
+ * 
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState}.
+ */
 class JdbcWriterStateTest {
 
     @Test
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index 85db5e37..eb8569db 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
-import org.apache.flink.connector.jdbc.JdbcSink;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
 import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
 import org.apache.flink.connector.jdbc.testutils.TableManaged;
 import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
@@ -35,10 +35,12 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -46,6 +48,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.sql.XADataSource;
+
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -140,7 +144,7 @@ public abstract class JdbcExactlyOnceSinkE2eTest implements 
DatabaseTest {
                 .setParallelism(PARALLELISM)
                 .map(new FailingMapper(numElementsPerCheckpoint + 
(numElementsPerCheckpoint / 2)))
                 .addSink(
-                        JdbcSink.exactlyOnceSink(
+                        exactlyOnceSink(
                                 OUTPUT_TABLE.getInsertIntoQuery(),
                                 OUTPUT_TABLE.getStatementBuilder(),
                                 
JdbcExecutionOptions.builder().withMaxRetries(0).build(),
@@ -166,6 +170,23 @@ public abstract class JdbcExactlyOnceSinkE2eTest 
implements DatabaseTest {
                 System.currentTimeMillis() - started);
     }
 
+    private <T> SinkFunction<T> exactlyOnceSink(
+            String sql,
+            JdbcStatementBuilder<T> statementBuilder,
+            JdbcExecutionOptions executionOptions,
+            JdbcExactlyOnceOptions exactlyOnceOptions,
+            SerializableSupplier<XADataSource> dataSourceSupplier) {
+        return new JdbcXaSinkFunction<>(
+                sql,
+                statementBuilder,
+                XaFacade.fromXaDataSourceSupplier(
+                        dataSourceSupplier,
+                        exactlyOnceOptions.getTimeoutSec(),
+                        exactlyOnceOptions.isTransactionPerConnection()),
+                executionOptions,
+                exactlyOnceOptions);
+    }
+
     /**
      * {@link SourceFunction} emits {@link BooksTable.BookEntry test entries} 
and waits for the
      * checkpoint.
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
similarity index 96%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
index efd976e2..a46601f6 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java
@@ -18,11 +18,11 @@
 package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
-import org.apache.flink.connector.jdbc.sink.JdbcSinkBuilder;
 import org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction;
 import org.apache.flink.connector.jdbc.xa.XaFacade;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -32,6 +32,7 @@ import javax.sql.XADataSource;
 
 /** Facade to create JDBC {@link SinkFunction sinks}. */
 @PublicEvolving
+@Deprecated
 public class JdbcSink {
 
     /**
@@ -109,7 +110,7 @@ public class JdbcSink {
     }
 
     public static <IN> JdbcSinkBuilder<IN> builder() {
-        return org.apache.flink.connector.jdbc.sink.JdbcSink.builder();
+        return 
org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink.builder();
     }
 
     private JdbcSink() {}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java
similarity index 87%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java
index 986fcba1..a1f8a4b4 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java
@@ -25,15 +25,15 @@ import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitableSerializer;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitter;
+import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriter;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState;
+import 
org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterStateSerializer;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitableSerializer;
-import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitter;
-import org.apache.flink.connector.jdbc.sink.writer.JdbcWriter;
-import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState;
-import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterStateSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
@@ -44,8 +44,10 @@ import java.util.Collections;
  * Flink Sink to produce data into a jdbc database.
  *
  * @see JdbcSinkBuilder on how to construct a JdbcSink
+ * @deprecated please use {@link 
org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink}
  */
 @PublicEvolving
+@Deprecated
 public class JdbcSink<IN>
         implements StatefulSink<IN, JdbcWriterState>, 
TwoPhaseCommittingSink<IN, JdbcCommitable> {
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java
similarity index 96%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java
index ede9a00e..3b073568 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java
@@ -36,8 +36,14 @@ import javax.sql.XADataSource;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** Builder to construct {@link JdbcSink}. */
+/**
+ * Builder to construct {@link JdbcSink}.
+ *
+ * @deprecated please use {@link
+ *     org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder}
+ */
 @PublicEvolving
+@Deprecated
 public class JdbcSinkBuilder<IN> {
 
     private JdbcExecutionOptions executionOptions;


Reply via email to