This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 231542bbdbe96ecd3bfcc3f6c2f4c8aa2bc56306
Author: Joao Boto <[email protected]>
AuthorDate: Mon Jul 8 16:14:38 2024 +0200

    [FLINK-35494] Reorganize sources
---
 .../flink/connector/jdbc/JdbcInputFormat.java      |  7 +++--
 .../flink/connector/jdbc/core/datastream/Jdbc.java |  7 +++++
 .../{ => core/datastream}/source/JdbcSource.java   | 20 ++++++-------
 .../datastream}/source/JdbcSourceBuilder.java      | 29 ++++++++----------
 .../datastream}/source/JdbcSourceOptions.java      |  2 +-
 .../enumerator/JdbcSourceEnumStateSerializer.java  |  6 ++--
 .../source/enumerator/JdbcSourceEnumerator.java    |  4 +--
 .../enumerator/JdbcSourceEnumeratorState.java      |  4 +--
 .../enumerator/JdbcSqlSplitEnumeratorBase.java     |  4 +--
 .../enumerator/SqlTemplateSplitEnumerator.java     |  6 ++--
 .../source/reader/JdbcRecordEmitter.java           |  6 ++--
 .../source/reader/JdbcSourceReader.java            |  6 ++--
 .../source/reader/JdbcSourceSplitReader.java       | 16 +++++-----
 .../datastream}/source/reader/RecordAndOffset.java |  4 +--
 .../source/reader/extractor/ResultExtractor.java   |  2 +-
 .../reader/extractor/RowResultExtractor.java       |  2 +-
 .../source/split/CheckpointedOffset.java           |  2 +-
 .../datastream}/source/split/JdbcSourceSplit.java  |  2 +-
 .../source/split/JdbcSourceSplitSerializer.java    |  2 +-
 .../source/split/JdbcSourceSplitState.java         |  2 +-
 .../jdbc/table/RowDataResultExtractor.java         |  2 +-
 .../reader/splitreader/TestingSplitsChange.java    |  2 +-
 .../flink/connector/jdbc/JdbcDataTestBase.java     |  2 +-
 .../datastream}/source/JdbcSourceBuilderTest.java  | 12 ++++----
 .../datastream}/source/JdbcSourceITCase.java       |  7 +++--
 .../source/JdbcSourceStreamRelatedITCase.java      |  9 ++++--
 .../JdbcSourceEnumStateSerializerTest.java         | 13 +++++----
 .../enumerator/JdbcSourceEnumeratorTest.java       | 11 ++++---
 .../source/reader/JdbcSourceReaderTest.java        | 10 ++++---
 .../source/reader/JdbcSourceSplitReaderTest.java   | 11 ++++---
 .../split/JdbcSourceSplitSerializerTest.java       |  7 +++--
 .../flink/connector/jdbc/source/JdbcSource.java    | 25 +++++++++-------
 .../connector/jdbc/source/JdbcSourceBuilder.java   | 33 ++++++++++-----------
 .../source/reader/extractor/ResultExtractor.java   | 34 +++++-----------------
 34 files changed, 159 insertions(+), 152 deletions(-)

diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
index 20e1a2c7..dacf3c6c 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
@@ -99,9 +101,8 @@ import java.util.Arrays;
  * @see JdbcParameterValuesProvider
  * @see PreparedStatement
  * @see DriverManager
- * @deprecated Please use {@link 
org.apache.flink.connector.jdbc.source.JdbcSource} instead. The
- *     builder utils and parameters passing could be view {@link
- *     org.apache.flink.connector.jdbc.source.JdbcSourceBuilder}.
+ * @deprecated Please use {@link JdbcSource} instead. The builder utils and 
parameters passing could
+ *     be view {@link JdbcSourceBuilder}.
  */
 @Deprecated
 @Experimental
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java
index 2e09c5f4..433a2626 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java
@@ -20,11 +20,18 @@ package org.apache.flink.connector.jdbc.core.datastream;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
 import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder;
+import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
 
 /** Facade to create JDBC stream sources and sinks. */
 @PublicEvolving
 public class Jdbc {
 
+    /** Create a JDBC source builder. */
+    public static <OUT> JdbcSourceBuilder<OUT> sourceBuilder() {
+        return JdbcSource.builder();
+    }
+
     /** Create a JDBC sink builder. */
     public static <IN> JdbcSinkBuilder<IN> sinkBuilder() {
         return JdbcSink.builder();
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
similarity index 89%
copy from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
copy to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
index 08e4a777..eef58c07 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source;
+package org.apache.flink.connector.jdbc.core.datastream.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -30,16 +30,16 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumStateSerializer;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
-import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
-import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
-import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
-import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
-import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
-import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
 import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.Preconditions;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java
similarity index 91%
copy from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
copy to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java
index 8d52f7ac..451082a0 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java
@@ -16,17 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source;
+package org.apache.flink.connector.jdbc.core.datastream.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
-import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
 import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
@@ -46,12 +46,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.Objects;
 
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
-
 /**
  * A tool is used to build {@link JdbcSource} quickly.
  *
@@ -130,9 +124,9 @@ public class JdbcSourceBuilder<OUT> {
     JdbcSourceBuilder() {
         this.configuration = new Configuration();
         this.connOptionsBuilder = new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder();
-        this.splitReaderFetchBatchSize = 
READER_FETCH_BATCH_SIZE.defaultValue();
-        this.resultSetType = RESULTSET_TYPE.defaultValue();
-        this.resultSetConcurrency = RESULTSET_CONCURRENCY.defaultValue();
+        this.splitReaderFetchBatchSize = 
JdbcSourceOptions.READER_FETCH_BATCH_SIZE.defaultValue();
+        this.resultSetType = JdbcSourceOptions.RESULTSET_TYPE.defaultValue();
+        this.resultSetConcurrency = 
JdbcSourceOptions.RESULTSET_CONCURRENCY.defaultValue();
         this.deliveryGuarantee = DeliveryGuarantee.NONE;
         // Boolean to distinguish between default value and explicitly set 
autoCommit mode.
         this.autoCommit = true;
@@ -275,7 +269,7 @@ public class JdbcSourceBuilder<OUT> {
     public JdbcSource<OUT> build() {
         this.connectionProvider = new 
SimpleJdbcConnectionProvider(connOptionsBuilder.build());
         if (resultSetFetchSize > 0) {
-            this.configuration.set(RESULTSET_FETCH_SIZE, resultSetFetchSize);
+            this.configuration.set(JdbcSourceOptions.RESULTSET_FETCH_SIZE, 
resultSetFetchSize);
         }
 
         if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
@@ -286,10 +280,11 @@ public class JdbcSourceBuilder<OUT> {
                     DeliveryGuarantee.EXACTLY_ONCE);
         }
 
-        this.configuration.set(RESULTSET_CONCURRENCY, resultSetConcurrency);
-        this.configuration.set(RESULTSET_TYPE, resultSetType);
-        this.configuration.set(READER_FETCH_BATCH_SIZE, 
splitReaderFetchBatchSize);
-        this.configuration.set(AUTO_COMMIT, autoCommit);
+        this.configuration.set(JdbcSourceOptions.RESULTSET_CONCURRENCY, 
resultSetConcurrency);
+        this.configuration.set(JdbcSourceOptions.RESULTSET_TYPE, 
resultSetType);
+        this.configuration.set(
+                JdbcSourceOptions.READER_FETCH_BATCH_SIZE, 
splitReaderFetchBatchSize);
+        this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit);
 
         Preconditions.checkState(
                 !StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be 
null or empty.");
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceOptions.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceOptions.java
similarity index 97%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceOptions.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceOptions.java
index c83ad7b5..294be051 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceOptions.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceOptions.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source;
+package org.apache.flink.connector.jdbc.core.datastream.source;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializer.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializer.java
similarity index 94%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializer.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializer.java
index 2d2283ac..2be1deb1 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializer.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializer.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.enumerator;
+package org.apache.flink.connector.jdbc.core.datastream.source.enumerator;
 
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java
index a26905fc..9ebf389a 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java
@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.enumerator;
+package org.apache.flink.connector.jdbc.core.datastream.source.enumerator;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
 import org.apache.flink.util.Preconditions;
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorState.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorState.java
similarity index 95%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorState.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorState.java
index 2cd3903c..1c7d5ad4 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorState.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorState.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.enumerator;
+package org.apache.flink.connector.jdbc.core.datastream.source.enumerator;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java
similarity index 95%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java
index 25d09a71..dd3eb34b 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.enumerator;
+package org.apache.flink.connector.jdbc.core.datastream.source.enumerator;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java
similarity index 95%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java
index 2099d0e0..b4565d95 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.enumerator;
+package org.apache.flink.connector.jdbc.core.datastream.source.enumerator;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcRecordEmitter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcRecordEmitter.java
similarity index 86%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcRecordEmitter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcRecordEmitter.java
index 803f7f3c..d707c68d 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcRecordEmitter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcRecordEmitter.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.reader;
+package org.apache.flink.connector.jdbc.core.datastream.source.reader;
 
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitState;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitState;
 
 /**
  * The JDBC resorce emitter.
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReader.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReader.java
similarity index 91%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReader.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReader.java
index 7d94d271..3a5edeb7 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReader.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReader.java
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.reader;
+package org.apache.flink.connector.jdbc.core.datastream.source.reader;
 
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitState;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitState;
 
 import java.util.Map;
 import java.util.function.Supplier;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java
similarity index 94%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java
index 5011f42b..7e1af42d 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.reader;
+package org.apache.flink.connector.jdbc.core.datastream.source.reader;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -29,9 +29,9 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -51,11 +51,11 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+import static 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.RESULTSET_TYPE;
 
 /**
  * The JDBC source reader to read data from jdbc splits.
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/RecordAndOffset.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/RecordAndOffset.java
similarity index 91%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/RecordAndOffset.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/RecordAndOffset.java
index eaf0981b..9b0d2861 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/RecordAndOffset.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/RecordAndOffset.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.reader;
+package org.apache.flink.connector.jdbc.core.datastream.source.reader;
 
-import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset;
 
 /**
  * Util class to represent the record with the corresponding information.
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/ResultExtractor.java
similarity index 95%
copy from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java
copy to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/ResultExtractor.java
index 19bd6672..5a2a0e7b 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/ResultExtractor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.reader.extractor;
+package 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.types.Row;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/RowResultExtractor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowResultExtractor.java
similarity index 94%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/RowResultExtractor.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowResultExtractor.java
index 6b0d414e..cc050acc 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/RowResultExtractor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowResultExtractor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.reader.extractor;
+package 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/CheckpointedOffset.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/CheckpointedOffset.java
similarity index 97%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/CheckpointedOffset.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/CheckpointedOffset.java
index e6fa7572..b49a2b22 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/CheckpointedOffset.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/CheckpointedOffset.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.split;
+package org.apache.flink.connector.jdbc.core.datastream.source.split;
 
 import org.apache.flink.annotation.PublicEvolving;
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplit.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplit.java
index 7a690ca9..f57169ba 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplit.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.split;
+package org.apache.flink.connector.jdbc.core.datastream.source.split;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.SourceSplit;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializer.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializer.java
index 3f6afb6e..86775fbf 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.split;
+package org.apache.flink.connector.jdbc.core.datastream.source.split;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.InstantiationUtil;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitState.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitState.java
index 2e3b2c3d..98d523b2 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitState.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.split;
+package org.apache.flink.connector.jdbc.core.datastream.source.split;
 
 import org.apache.flink.util.FlinkRuntimeException;
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
index 2bb65b80..534f4921 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
@@ -19,7 +19,7 @@
 package org.apache.flink.connector.jdbc.table;
 
 import 
org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/base/source/reader/splitreader/TestingSplitsChange.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/base/source/reader/splitreader/TestingSplitsChange.java
index 97d2f7dd..962cdd81 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/base/source/reader/splitreader/TestingSplitsChange.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/base/source/reader/splitreader/TestingSplitsChange.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.splitreader;
 
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 
 import java.util.List;
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
index d61100ef..808c7124 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.connector.jdbc;
 
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
 import org.apache.flink.connector.jdbc.derby.testutils.DerbyMetadata;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java
similarity index 93%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java
index a760749a..36130033 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source;
+package org.apache.flink.connector.jdbc.core.datastream.source;
 
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
 import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import 
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
@@ -34,12 +34,12 @@ import org.junit.jupiter.api.Test;
 import java.io.Serializable;
 import java.time.Duration;
 
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link JdbcSourceBuilder}. */
+/** Test for {@link 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder}. */
 class JdbcSourceBuilderTest {
 
     private final String emptySql = "";
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
similarity index 97%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceITCase.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
index 845cfde5..2021094e 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source;
+package org.apache.flink.connector.jdbc.core.datastream.source;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -39,7 +39,10 @@ import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** The Integration test for {@link JdbcSource}. */
+/**
+ * The Integration test for {@link
+ * org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource}.
+ */
 class JdbcSourceITCase extends JdbcDataTestBase implements JdbcITCaseBase {
 
     public static Queue<TestEntry> collectedRecords;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java
index 2be8b404..445f6ba9 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source;
+package org.apache.flink.connector.jdbc.core.datastream.source;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -27,8 +27,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
 import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
 import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
 import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
 import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
@@ -65,7 +65,10 @@ import java.util.function.Supplier;
 import static 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for streaming semantic related cases of {@link JdbcSource}. */
+/**
+ * Test for streaming semantic related cases of {@link
+ * org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource}.
+ */
 class JdbcSourceStreamRelatedITCase implements DerbyTestBase, JdbcITCaseBase {
 
     private static final long ONE_SECOND = Duration.ofSeconds(1L).toMillis();
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializerTest.java
similarity index 88%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializerTest.java
index ce7548d2..6bc9cee7 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializerTest.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.enumerator;
+package org.apache.flink.connector.jdbc.core.datastream.source.enumerator;
 
-import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
 
 import org.junit.jupiter.api.Test;
 
@@ -32,7 +32,10 @@ import java.util.Random;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link JdbcSourceEnumStateSerializer}. */
+/**
+ * Test for {@link
+ * 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumStateSerializer}.
+ */
 class JdbcSourceEnumStateSerializerTest {
 
     private final JdbcSourceEnumeratorState state =
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java
similarity index 91%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java
index 443fb05b..268fb73f 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.enumerator;
+package org.apache.flink.connector.jdbc.core.datastream.source.enumerator;
 
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -38,7 +38,10 @@ import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link JdbcSourceEnumerator}. */
+/**
+ * Test for {@link
+ * 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator}.
+ */
 class JdbcSourceEnumeratorTest {
 
     private static long splitId = 1L;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReaderTest.java
similarity index 90%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReaderTest.java
index f549869f..ab6cc327 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReaderTest.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.reader;
+package org.apache.flink.connector.jdbc.core.datastream.source.reader;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.JdbcDataTestBase;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 
 import org.junit.jupiter.api.Test;
@@ -34,7 +34,9 @@ import java.util.Collections;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link JdbcSourceReader}. */
+/**
+ * Test for {@link 
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader}.
+ */
 class JdbcSourceReaderTest extends JdbcDataTestBase {
 
     @Test
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java
similarity index 94%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java
index 7ebe0e12..b28238f9 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.reader;
+package org.apache.flink.connector.jdbc.core.datastream.source.reader;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
@@ -26,10 +26,10 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import 
org.apache.flink.connector.base.source.reader.splitreader.TestingSplitsChange;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.JdbcDataTestBase;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 
 import org.junit.jupiter.api.Test;
@@ -44,7 +44,10 @@ import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link JdbcSourceSplitReader}. */
+/**
+ * Test for {@link
+ * 
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader}.
+ */
 class JdbcSourceSplitReaderTest extends JdbcDataTestBase {
 
     private final JdbcSourceSplit split =
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializerTest.java
similarity index 94%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializerTest.java
index 9d3f6516..e414da6d 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializerTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.source.split;
+package org.apache.flink.connector.jdbc.core.datastream.source.split;
 
 import org.junit.jupiter.api.Test;
 
@@ -27,7 +27,10 @@ import java.util.Random;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link JdbcSourceSplitSerializer}. */
+/**
+ * Test for {@link
+ * 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer}.
+ */
 class JdbcSourceSplitSerializerTest {
 
     private final JdbcSourceSplit split = new JdbcSourceSplit("1", "select 1", 
null, null);
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
similarity index 88%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
index 08e4a777..19405624 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
@@ -30,16 +30,16 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumStateSerializer;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
-import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
-import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
-import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
-import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
-import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
-import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
-import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
 import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.Preconditions;
@@ -50,8 +50,13 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Objects;
 
-/** JDBC source. */
+/**
+ * JDBC source.
+ *
+ * @deprecated please use {@link 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource}
+ */
 @PublicEvolving
+@Deprecated
 public class JdbcSource<OUT>
         implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
                 ResultTypeQueryable<OUT> {
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
similarity index 90%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
index 8d52f7ac..7e940e92 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
@@ -23,10 +23,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
-import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
 import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
@@ -46,12 +47,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.Objects;
 
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
-import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
-
 /**
  * A tool is used to build {@link JdbcSource} quickly.
  *
@@ -94,8 +89,11 @@ import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET
  * @see PreparedStatement
  * @see DriverManager
  * @see JdbcSource
+ * @deprecated please use {@link
+ *     
org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder}
  */
 @PublicEvolving
+@Deprecated
 public class JdbcSourceBuilder<OUT> {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceBuilder.class);
@@ -130,9 +128,9 @@ public class JdbcSourceBuilder<OUT> {
     JdbcSourceBuilder() {
         this.configuration = new Configuration();
         this.connOptionsBuilder = new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder();
-        this.splitReaderFetchBatchSize = 
READER_FETCH_BATCH_SIZE.defaultValue();
-        this.resultSetType = RESULTSET_TYPE.defaultValue();
-        this.resultSetConcurrency = RESULTSET_CONCURRENCY.defaultValue();
+        this.splitReaderFetchBatchSize = 
JdbcSourceOptions.READER_FETCH_BATCH_SIZE.defaultValue();
+        this.resultSetType = JdbcSourceOptions.RESULTSET_TYPE.defaultValue();
+        this.resultSetConcurrency = 
JdbcSourceOptions.RESULTSET_CONCURRENCY.defaultValue();
         this.deliveryGuarantee = DeliveryGuarantee.NONE;
         // Boolean to distinguish between default value and explicitly set 
autoCommit mode.
         this.autoCommit = true;
@@ -191,7 +189,7 @@ public class JdbcSourceBuilder<OUT> {
     /**
      * The continuousUnBoundingSettings to discovery the next available batch 
splits. Note: If the
      * value was set, the {@link #jdbcParameterValuesProvider} must specified 
with the {@link
-     * org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider}.
+     * JdbcSlideTimingParameterProvider}.
      */
     public JdbcSourceBuilder<OUT> setContinuousUnBoundingSettings(
             ContinuousUnBoundingSettings continuousUnBoundingSettings) {
@@ -275,7 +273,7 @@ public class JdbcSourceBuilder<OUT> {
     public JdbcSource<OUT> build() {
         this.connectionProvider = new 
SimpleJdbcConnectionProvider(connOptionsBuilder.build());
         if (resultSetFetchSize > 0) {
-            this.configuration.set(RESULTSET_FETCH_SIZE, resultSetFetchSize);
+            this.configuration.set(JdbcSourceOptions.RESULTSET_FETCH_SIZE, 
resultSetFetchSize);
         }
 
         if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
@@ -286,10 +284,11 @@ public class JdbcSourceBuilder<OUT> {
                     DeliveryGuarantee.EXACTLY_ONCE);
         }
 
-        this.configuration.set(RESULTSET_CONCURRENCY, resultSetConcurrency);
-        this.configuration.set(RESULTSET_TYPE, resultSetType);
-        this.configuration.set(READER_FETCH_BATCH_SIZE, 
splitReaderFetchBatchSize);
-        this.configuration.set(AUTO_COMMIT, autoCommit);
+        this.configuration.set(JdbcSourceOptions.RESULTSET_CONCURRENCY, 
resultSetConcurrency);
+        this.configuration.set(JdbcSourceOptions.RESULTSET_TYPE, 
resultSetType);
+        this.configuration.set(
+                JdbcSourceOptions.READER_FETCH_BATCH_SIZE, 
splitReaderFetchBatchSize);
+        this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit);
 
         Preconditions.checkState(
                 !StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be 
null or empty.");
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java
similarity index 57%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java
index 19bd6672..79935080 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java
@@ -19,39 +19,19 @@
 package org.apache.flink.connector.jdbc.source.reader.extractor;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.types.Row;
 
-import java.io.Serializable;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 
 /**
  * The Extractor to extract the data from {@link ResultSet}.
  *
  * @param <T> The target data type.
+ * @deprecated please use {@link
+ *     
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor}
  */
 @PublicEvolving
-public interface ResultExtractor<T> extends Serializable {
-
-    /**
-     * Extract the data from the current point line of the result.
-     *
-     * @param resultSet Result set queried from a sql.
-     * @return The data object filled by the current line of the resultSet.
-     * @throws SQLException SQL exception.
-     */
-    T extract(ResultSet resultSet) throws SQLException;
-
-    /**
-     * The identifier of the extractor.
-     *
-     * @return identifier in {@link String} type.
-     */
-    default String identifier() {
-        return this.getClass().getSimpleName();
-    }
-
-    static ResultExtractor<Row> ofRowResultExtractor() {
-        return new RowResultExtractor();
-    }
-}
+@Deprecated
+public interface ResultExtractor<T>
+        extends 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor
+                        .ResultExtractor<
+                T> {}

Reply via email to