This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 67046e0aabd9a36f000f6f347554a20c019dfc21
Author: Joao Boto <[email protected]>
AuthorDate: Mon Jul 8 18:32:57 2024 +0200

    [FLINK-35368] Reorganize table code
---
 .../core/database/catalog/AbstractJdbcCatalog.java | 12 +++---
 .../catalog/factory/JdbcCatalogFactoryOptions.java |  2 +-
 .../{ => core}/table/JdbcConnectorOptions.java     |  4 +-
 .../{ => core}/table/JdbcDynamicTableFactory.java  | 46 +++++++++++-----------
 .../table/sink}/JdbcDynamicTableSink.java          |  2 +-
 .../table/sink}/JdbcOutputFormatBuilder.java       |  2 +-
 .../table/source}/JdbcDynamicTableSource.java      |  2 +-
 ...JdbcFilterPushdownPreparedStatementVisitor.java |  2 +-
 .../table/source}/JdbcRowDataInputFormat.java      |  2 +-
 .../table/source}/JdbcRowDataLookupFunction.java   |  2 +-
 .../table/source}/ParameterizedPredicate.java      |  2 +-
 .../org.apache.flink.table.factories.Factory       |  2 +-
 .../table/JdbcDynamicTableFactoryTest.java         |  4 +-
 .../jdbc/{ => core}/table/JdbcTablePlanTest.java   |  2 +-
 .../table/sink}/JdbcAppendOnlyWriterTest.java      |  2 +-
 .../table/sink}/JdbcDynamicTableSinkITCase.java    |  2 +-
 .../table/sink}/JdbcOutputFormatTest.java          |  2 +-
 .../source}/JdbcDynamicTableSourceITCase.java      |  2 +-
 ...FilterPushdownPreparedStatementVisitorTest.java |  2 +-
 .../table/source}/JdbcLookupTestBase.java          |  2 +-
 .../table/source}/JdbcRowDataInputFormatTest.java  |  2 +-
 .../source}/JdbcRowDataLookupFunctionTest.java     |  2 +-
 .../derby/table/DerbyDynamicTableSinkITCase.java   |  2 +-
 .../derby/table/DerbyDynamicTableSourceITCase.java |  2 +-
 .../jdbc/{ => core}/table/JdbcTablePlanTest.xml    |  0
 .../database/table/Db2DynamicTableSinkITCase.java  |  2 +-
 .../table/Db2DynamicTableSourceITCase.java         |  2 +-
 .../mysql/table/MySqlDynamicTableSinkITCase.java   |  2 +-
 .../mysql/table/MySqlDynamicTableSourceITCase.java |  2 +-
 .../OceanBaseMySqlDynamicTableSinkITCase.java      |  2 +-
 .../OceanBaseMySqlDynamicTableSourceITCase.java    |  2 +-
 .../OceanBaseOracleDynamicTableSinkITCase.java     |  2 +-
 .../OceanBaseOracleDynamicTableSourceITCase.java   |  2 +-
 .../oracle/table/OracleDynamicTableSinkITCase.java |  2 +-
 .../table/OracleDynamicTableSourceITCase.java      |  2 +-
 .../table/PostgresDynamicTableSinkITCase.java      |  2 +-
 .../table/PostgresDynamicTableSourceITCase.java    |  2 +-
 .../table/SqlServerDynamicTableSinkITCase.java     |  2 +-
 .../table/SqlServerDynamicTableSourceITCase.java   |  2 +-
 .../trino/table/TrinoDynamicTableSinkITCase.java   |  2 +-
 .../trino/table/TrinoDynamicTableSourceITCase.java |  2 +-
 flink-connector-jdbc/pom.xml                       |  8 ++++
 .../connector/jdbc/table/JdbcConnectorOptions.java | 31 +++++----------
 43 files changed, 89 insertions(+), 88 deletions(-)

diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
index 63d1e90f..7f859145 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.core.database.catalog;
 
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcDynamicTableFactory;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.AbstractCatalog;
@@ -77,11 +77,11 @@ import java.util.function.Predicate;
 import static 
org.apache.flink.connector.jdbc.JdbcConnectionOptions.PASSWORD_KEY;
 import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.USER_KEY;
 import static 
org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
-import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
-import static 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.TABLE_NAME;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.URL;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.USERNAME;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcDynamicTableFactory.IDENTIFIER;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
index 08faa228..02c607b4 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/factory/JdbcCatalogFactoryOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
-import org.apache.flink.connector.jdbc.table.JdbcConnectorOptions;
+import org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 
 /** {@link ConfigOption}s for {@link JdbcCatalog}. */
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
index 0b39df45..35c60232 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
@@ -179,5 +179,5 @@ public class JdbcConnectorOptions {
                     .defaultValue(3)
                     .withDescription("The max retry times if writing records 
to database failed.");
 
-    private JdbcConnectorOptions() {}
+    protected JdbcConnectorOptions() {}
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
similarity index 87%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
index 4e427027..56a0dcaa 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
 import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSink;
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSource;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
@@ -50,27 +52,27 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.COMPATIBLE_MODE;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.DRIVER;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_TTL;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_MAX_RETRIES;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.MAX_RETRY_TIMEOUT;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_AUTO_COMMIT;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_FETCH_SIZE;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_COLUMN;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_LOWER_BOUND;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_NUM;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_UPPER_BOUND;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_MAX_RETRIES;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_PARALLELISM;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
-import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
-import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.COMPATIBLE_MODE;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.DRIVER;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_TTL;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_MAX_RETRIES;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.MAX_RETRY_TIMEOUT;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_COLUMN;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_LOWER_BOUND;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_NUM;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_UPPER_BOUND;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_MAX_RETRIES;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SINK_PARALLELISM;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.TABLE_NAME;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.URL;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.USERNAME;
 import static 
org.apache.flink.connector.jdbc.utils.JdbcUtils.getConnectionProperties;
 
 /**
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java
index 4403f194..7b2c2f76 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java
index 1bd2fc5d..493cc8d2 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatBuilder.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.sink;
 
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
index 08716bc8..11bdd226 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcFilterPushdownPreparedStatementVisitor.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcFilterPushdownPreparedStatementVisitor.java
index a4e31eee..908beeb4 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcFilterPushdownPreparedStatementVisitor.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.table.expressions.CallExpression;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
index 812c1891..b06f555d 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
index 9bbe8939..dd39fa27 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/ParameterizedPredicate.java
similarity index 96%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/ParameterizedPredicate.java
index 82a9a856..483f3bca 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/ParameterizedPredicate.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.annotation.Experimental;
 
diff --git 
a/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index b0c68772..b245d318 100644
--- 
a/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
+org.apache.flink.connector.jdbc.core.table.JdbcDynamicTableFactory
 
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactory
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
index a23bf197..e9bf7022 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
@@ -16,10 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSink;
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSource;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
index c342d6ba..ad4558d4 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table;
 
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcAppendOnlyWriterTest.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcAppendOnlyWriterTest.java
index 17ea9260..bec03d73 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcAppendOnlyWriterTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.sink;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java
index 2d4735c6..f95fd3c9 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.sink;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
index fff4ad7a..32618d92 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.sink;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.jdbc.JdbcDataTestBase;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
index bab5a408..b05a038c 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcFilterPushdownPreparedStatementVisitorTest.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcFilterPushdownPreparedStatementVisitorTest.java
index 1f5a0493..752c25da 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcFilterPushdownPreparedStatementVisitorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcLookupTestBase.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcLookupTestBase.java
index 1c8cbdd9..7255538f 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcLookupTestBase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormatTest.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormatTest.java
index 259c3746..93f32490 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormatTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.connector.jdbc.JdbcDataTestBase;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunctionTest.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunctionTest.java
index 08d7d8fe..16f3937a 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunctionTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.table;
+package org.apache.flink.connector.jdbc.core.table.source;
 
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.table.api.DataTypes;
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/table/DerbyDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/table/DerbyDynamicTableSinkITCase.java
index 6912bae7..8242fb3f 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/table/DerbyDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/table/DerbyDynamicTableSinkITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.derby.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
 import org.apache.flink.connector.jdbc.derby.database.dialect.DerbyDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
 
 /** The Table Sink ITCase for {@link DerbyDialect}. */
 class DerbyDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase 
implements DerbyTestBase {}
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/table/DerbyDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/table/DerbyDynamicTableSourceITCase.java
index 5d201227..e1d95c19 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/table/DerbyDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/table/DerbyDynamicTableSourceITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.derby.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
 import org.apache.flink.connector.jdbc.derby.database.dialect.DerbyDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.types.Row;
diff --git 
a/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
 
b/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
similarity index 100%
rename from 
flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
rename to 
flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
diff --git 
a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/database/table/Db2DynamicTableSinkITCase.java
 
b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/database/table/Db2DynamicTableSinkITCase.java
index 769177c4..2f0acb22 100644
--- 
a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/database/table/Db2DynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/database/table/Db2DynamicTableSinkITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.db2.database.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.db2.Db2TestBase;
 import org.apache.flink.connector.jdbc.db2.database.dialect.Db2Dialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 
diff --git 
a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/database/table/Db2DynamicTableSourceITCase.java
 
b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/database/table/Db2DynamicTableSourceITCase.java
index 13c6d603..c5b94fb0 100644
--- 
a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/database/table/Db2DynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/database/table/Db2DynamicTableSourceITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.db2.database.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.db2.Db2TestBase;
 import org.apache.flink.connector.jdbc.db2.database.dialect.Db2Dialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.types.Row;
diff --git 
a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/table/MySqlDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/table/MySqlDynamicTableSinkITCase.java
index 3b48699a..83d7e118 100644
--- 
a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/table/MySqlDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/table/MySqlDynamicTableSinkITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.mysql.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.mysql.MySqlTestBase;
 import org.apache.flink.connector.jdbc.mysql.database.dialect.MySqlDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 
diff --git 
a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/table/MySqlDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/table/MySqlDynamicTableSourceITCase.java
index ff335e5e..1de6b73f 100644
--- 
a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/table/MySqlDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/table/MySqlDynamicTableSourceITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.mysql.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.mysql.MySqlTestBase;
 import org.apache.flink.connector.jdbc.mysql.database.dialect.MySqlDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.types.Row;
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
index 03532644..ff6522ad 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.oceanbase.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
index 0af6da0a..f32d3150 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.oceanbase.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.oceanbase.OceanBaseMysqlTestBase;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.types.Row;
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
index 711d64ae..03b91ae8 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.oceanbase.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.oceanbase.OceanBaseOracleTestBase;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.types.Row;
diff --git 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
index 03dd6ab9..b4ad8a56 100644
--- 
a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.oceanbase.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.oceanbase.OceanBaseOracleTestBase;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.types.Row;
diff --git 
a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/table/OracleDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/table/OracleDynamicTableSinkITCase.java
index b467cb72..6fab11d4 100644
--- 
a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/table/OracleDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/table/OracleDynamicTableSinkITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.oracle.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.oracle.OracleTestBase;
 import org.apache.flink.connector.jdbc.oracle.database.dialect.OracleDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableBuilder;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
diff --git 
a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/table/OracleDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/table/OracleDynamicTableSourceITCase.java
index 97ff3924..e3e6fff9 100644
--- 
a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/table/OracleDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/table/OracleDynamicTableSourceITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.oracle.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.oracle.OracleTestBase;
 import org.apache.flink.connector.jdbc.oracle.database.dialect.OracleDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableBuilder;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java
index 2947a3a9..040c883e 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.postgres.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
 import 
org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
 
 /** The Table Sink ITCase for {@link PostgresDialect}. */
 class PostgresDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase
diff --git 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
index f2c43e88..079260d9 100644
--- 
a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSourceITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.postgres.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
 import 
org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.types.Row;
diff --git 
a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/table/SqlServerDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/table/SqlServerDynamicTableSinkITCase.java
index 2c2e3eb1..5088a7c2 100644
--- 
a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/table/SqlServerDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/table/SqlServerDynamicTableSinkITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.sqlserver.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.sqlserver.SqlServerTestBase;
 import 
org.apache.flink.connector.jdbc.sqlserver.database.dialect.SqlServerDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 
diff --git 
a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/table/SqlServerDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/table/SqlServerDynamicTableSourceITCase.java
index 34d6161a..8b85d01a 100644
--- 
a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/table/SqlServerDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/table/SqlServerDynamicTableSourceITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.jdbc.sqlserver.table;
 
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.sqlserver.SqlServerTestBase;
 import 
org.apache.flink.connector.jdbc.sqlserver.database.dialect.SqlServerDialect;
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.types.Row;
diff --git 
a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/table/TrinoDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/table/TrinoDynamicTableSinkITCase.java
index b1e115aa..58622321 100644
--- 
a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/table/TrinoDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/table/TrinoDynamicTableSinkITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.trino.table;
 
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
+import 
org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase;
 import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
 import org.apache.flink.connector.jdbc.testutils.TableManaged;
 import org.apache.flink.connector.jdbc.trino.TrinoTestBase;
diff --git 
a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/table/TrinoDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/table/TrinoDynamicTableSourceITCase.java
index 560fd31f..3c7df607 100644
--- 
a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/table/TrinoDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/table/TrinoDynamicTableSourceITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.trino.table;
 
-import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import 
org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
 import org.apache.flink.connector.jdbc.trino.TrinoTestBase;
 import org.apache.flink.connector.jdbc.trino.database.dialect.TrinoDialect;
diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml
index 45cc62f4..c038ca2c 100644
--- a/flink-connector-jdbc/pom.xml
+++ b/flink-connector-jdbc/pom.xml
@@ -145,6 +145,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- H2 tests -->
+               <dependency>
+                       <groupId>com.h2database</groupId>
+                       <artifactId>h2</artifactId>
+                       <version>2.2.224</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <build>
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
similarity index 50%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
index 534f4921..8612e926 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
@@ -18,25 +18,14 @@
 
 package org.apache.flink.connector.jdbc.table;
 
-import 
org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
-import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.annotation.PublicEvolving;
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-/** The result extractor for {@link RowData}. */
-public class RowDataResultExtractor implements ResultExtractor<RowData> {
-
-    private final JdbcDialectConverter jdbcDialectConverter;
-
-    public RowDataResultExtractor(JdbcDialectConverter jdbcDialectConverter) {
-        this.jdbcDialectConverter = 
Preconditions.checkNotNull(jdbcDialectConverter);
-    }
-
-    @Override
-    public RowData extract(ResultSet resultSet) throws SQLException {
-        return jdbcDialectConverter.toInternal(resultSet);
-    }
-}
+/**
+ * Options for the JDBC connector.
+ *
+ * @deprecated please use {@link 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions}
+ */
+@PublicEvolving
+@Deprecated
+public class JdbcConnectorOptions
+        extends 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions {}

Reply via email to