This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit a00115191ff34bfdfc4c8d3d8dfda25451c039b1
Author: Joao Boto <[email protected]>
AuthorDate: Fri Apr 26 11:33:04 2024 +0200

    [FLINK-35365] Reorganize dialect code
---
 .../connector/jdbc/catalog/JdbcCatalogUtils.java   |  8 +-
 ...bstractPostgresCompatibleDialectConverter.java} |  7 +-
 .../table/JdbcFactory.java}                        | 25 +++++--
 .../table/JdbcFactoryLoader.java}                  | 85 ++++++++++++++++------
 .../{ => core/table}/dialect/AbstractDialect.java  |  2 +-
 .../table/dialect/AbstractDialectConverter.java}   |  6 +-
 .../jdbc/{ => core/table}/dialect/JdbcDialect.java |  8 +-
 .../table/dialect/JdbcDialectConverter.java}       |  4 +-
 .../databases/cratedb/dialect/CrateDBDialect.java  |  4 +-
 ...Converter.java => CrateDBDialectConverter.java} |  6 +-
 ...teDBDialectFactory.java => CrateDBFactory.java} |  8 +-
 .../jdbc/databases/db2/dialect/Db2Dialect.java     |  8 +-
 ...2RowConverter.java => Db2DialectConverter.java} |  6 +-
 .../{Db2DialectFactory.java => Db2Factory.java}    |  8 +-
 .../jdbc/databases/derby/dialect/DerbyDialect.java |  8 +-
 ...owConverter.java => DerbyDialectConverter.java} |  6 +-
 ...{DerbyDialectFactory.java => DerbyFactory.java} |  8 +-
 .../databases/mysql/catalog/MySqlTypeMapper.java   |  3 +-
 ...owConverter.java => MySQLDialectConverter.java} |  6 +-
 .../jdbc/databases/mysql/dialect/MySqlDialect.java |  8 +-
 ...{MySqlDialectFactory.java => MySqlFactory.java} |  8 +-
 .../oceanbase/dialect/OceanBaseDialect.java        |  8 +-
 ...nverter.java => OceanBaseDialectConverter.java} |  6 +-
 ...seDialectFactory.java => OceanBaseFactory.java} | 10 +--
 .../databases/oracle/dialect/OracleDialect.java    |  8 +-
 ...wConverter.java => OracleDialectConverter.java} |  6 +-
 ...racleDialectFactory.java => OracleFactory.java} |  8 +-
 .../postgres/dialect/PostgresDialect.java          |  4 +-
 ...onverter.java => PostgresDialectConverter.java} |  6 +-
 ...resDialectFactory.java => PostgresFactory.java} |  8 +-
 .../sqlserver/dialect/SqlServerDialect.java        |  8 +-
 ...nverter.java => SqlServerDialectConverter.java} |  6 +-
 ...erDialectFactory.java => SqlServerFactory.java} |  8 +-
 .../jdbc/databases/trino/dialect/TrinoDialect.java |  8 +-
 ...owConverter.java => TrinoDialectConverter.java} |  6 +-
 ...{TrinoDialectFactory.java => TrinoFactory.java} |  8 +-
 .../dialect/AbstractPostgresCompatibleDialect.java |  1 +
 .../TableInsertOrUpdateStatementExecutor.java      | 14 ++--
 .../executor/TableSimpleStatementExecutor.java     |  7 +-
 .../options/InternalJdbcConnectionOptions.java     |  8 +-
 .../jdbc/internal/options/JdbcDmlOptions.java      |  2 +-
 .../jdbc/table/JdbcDynamicTableFactory.java        | 12 +--
 .../jdbc/table/JdbcDynamicTableSource.java         |  2 +-
 .../jdbc/table/JdbcOutputFormatBuilder.java        |  6 +-
 .../jdbc/table/JdbcRowDataInputFormat.java         | 10 +--
 .../jdbc/table/JdbcRowDataLookupFunction.java      | 12 +--
 .../jdbc/table/RowDataResultExtractor.java         | 10 +--
 ...he.flink.connector.jdbc.core.table.JdbcFactory} | 18 ++---
 ...Test.java => AbstractDialectConverterTest.java} | 10 ++-
 .../db2/dialect/Db2PreparedStatementTest.java      |  7 +-
 .../dialect/OraclePreparedStatementTest.java       |  6 +-
 .../dialect/SqlServerPreparedStatementTest.java    |  6 +-
 .../trino/dialect/TrinoPreparedStatementTest.java  |  7 +-
 .../FieldNamedPreparedStatementImplTest.java       |  7 +-
 .../jdbc/table/JdbcAppendOnlyWriterTest.java       |  4 +-
 ...FilterPushdownPreparedStatementVisitorTest.java |  6 +-
 .../jdbc/table/JdbcRowDataInputFormatTest.java     |  2 +-
 flink-connector-jdbc/pom.xml                       | 20 +++++
 .../jdbc/converter/AbstractJdbcRowConverter.java   | 26 +++----
 .../connector/jdbc/converter/JdbcRowConverter.java | 30 ++++----
 .../connector/jdbc/dialect/AbstractDialect.java    | 38 ++++++++++
 .../flink/connector/jdbc/dialect/JdbcDialect.java  | 31 ++++----
 .../connector/jdbc/dialect/JdbcDialectFactory.java | 53 ++++++++------
 63 files changed, 403 insertions(+), 292 deletions(-)

diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
index a1dbf5af..af94b9cb 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
+import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 import 
org.apache.flink.connector.jdbc.databases.cratedb.catalog.CrateDBCatalog;
 import 
org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialect;
 import org.apache.flink.connector.jdbc.databases.mysql.catalog.MySqlCatalog;
 import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
 import 
org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog;
 import 
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
 
 import java.util.Properties;
 
@@ -71,7 +71,9 @@ public class JdbcCatalogUtils {
             String baseUrl,
             String compatibleMode,
             Properties connectionProperties) {
-        JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, compatibleMode, 
userClassLoader);
+
+        JdbcDialect dialect =
+                JdbcFactoryLoader.loadDialect(baseUrl, userClassLoader, 
compatibleMode);
 
         if (dialect instanceof PostgresDialect) {
             return new PostgresCatalog(
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleDialectConverter.java
similarity index 92%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleDialectConverter.java
index 86c6f16c..85417814 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleDialectConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.jdbc.converter;
 
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -31,12 +32,12 @@ import java.lang.reflect.Array;
  * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
  * PostgreSQL compatible databases.
  */
-public abstract class AbstractPostgresCompatibleRowConverter<T extends 
java.sql.Array>
-        extends AbstractJdbcRowConverter {
+public abstract class AbstractPostgresCompatibleDialectConverter<T extends 
java.sql.Array>
+        extends AbstractDialectConverter {
 
     private static final long serialVersionUID = 1L;
 
-    protected AbstractPostgresCompatibleRowConverter(RowType rowType) {
+    protected AbstractPostgresCompatibleDialectConverter(RowType rowType) {
         super(rowType);
     }
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactory.java
similarity index 73%
copy from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
copy to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactory.java
index 84f4bd78..448ecaa5 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactory.java
@@ -16,9 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.dialect;
+package org.apache.flink.connector.jdbc.core.table;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 import org.apache.flink.util.StringUtils;
 
 /**
@@ -26,13 +27,13 @@ import org.apache.flink.util.StringUtils;
  * Provider Interfaces (SPI) for discovering.
  *
  * <p>Classes that implement this interface can be added to the
- * 
"META_INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory" 
file of a JAR file
- * in the current classpath to be found.
+ * "META_INF/services/org.apache.flink.connector.jdbc.core.table.JdbcFactory" 
file of a JAR file in
+ * the current classpath to be found.
  *
  * @see JdbcDialect
  */
 @PublicEvolving
-public interface JdbcDialectFactory {
+public interface JdbcFactory {
 
     /**
      * Retrieves whether the dialect thinks that it can open a connection to 
the given URL.
@@ -45,8 +46,8 @@ public interface JdbcDialectFactory {
      */
     boolean acceptsURL(String url);
 
-    /** @return Creates a new instance of the {@link JdbcDialect}. */
-    JdbcDialect create();
+    /** @return a new instance of the {@link JdbcDialect}. */
+    JdbcDialect createDialect();
 
     /**
      * Creates a new instance of the {@link JdbcDialect} based on compatible 
mode.
@@ -54,11 +55,19 @@ public interface JdbcDialectFactory {
      * @param compatibleMode the compatible mode of database
      * @return a new instance of {@link JdbcDialect}
      */
-    default JdbcDialect create(String compatibleMode) {
+    default JdbcDialect createDialect(String compatibleMode) {
         if (StringUtils.isNullOrWhitespaceOnly(compatibleMode)) {
-            return create();
+            return createDialect();
         }
         throw new UnsupportedOperationException(
                 "Not supported option 'compatible-mode' with value: " + 
compatibleMode);
     }
+
+    //    JdbcCatalog createCatalog(
+    //            ClassLoader classLoader,
+    //            String catalogName,
+    //            String defaultDatabase,
+    //            String username,
+    //            String pwd,
+    //            String baseUrl);
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactoryLoader.java
similarity index 52%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactoryLoader.java
index 4e9b9ba0..3549b76d 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactoryLoader.java
@@ -16,9 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.dialect;
+package org.apache.flink.connector.jdbc.core.table;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,47 +32,84 @@ import java.util.stream.Collectors;
 
 /** Utility for working with {@link JdbcDialect}. */
 @Internal
-public final class JdbcDialectLoader {
+public final class JdbcFactoryLoader {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcDialectLoader.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcFactoryLoader.class);
 
-    private JdbcDialectLoader() {}
+    private JdbcFactoryLoader() {}
 
-    public static JdbcDialect load(String url, ClassLoader classLoader) {
-        return load(url, null, classLoader);
+    public static JdbcDialect loadDialect(String url, ClassLoader classLoader) 
{
+        return loadDialect(url, classLoader, null);
     }
 
     /**
      * Loads the unique JDBC Dialect that can handle the given database url.
      *
      * @param url A database URL.
-     * @param compatibleMode the compatible mode of database
-     * @param classLoader the classloader used to load the factory
+     * @param classLoader the classloader used to load the factory.
+     * @param compatibleMode the compatible mode of database.
      * @throws IllegalStateException if the loader cannot find exactly one 
dialect that can
      *     unambiguously process the given database URL.
      * @return The loaded dialect.
      */
-    public static JdbcDialect load(String url, String compatibleMode, 
ClassLoader classLoader) {
-        List<JdbcDialectFactory> foundFactories = 
discoverFactories(classLoader);
+    public static JdbcDialect loadDialect(
+            String url, ClassLoader classLoader, String compatibleMode) {
+        return load(url, classLoader).createDialect(compatibleMode);
+    }
+
+    //    public static JdbcCatalog loadCatalog(
+    //            ClassLoader classLoader,
+    //            String catalogName,
+    //            String defaultDatabase,
+    //            String username,
+    //            String pwd,
+    //            String baseUrl) {
+    //        return load(baseUrl, classLoader)
+    //                .createCatalog(
+    //                        classLoader, catalogName, defaultDatabase, 
username, pwd, baseUrl,
+    // null);
+    //    }
+    //
+    //    public static JdbcCatalog loadCatalog(
+    //            ClassLoader classLoader,
+    //            String catalogName,
+    //            String defaultDatabase,
+    //            String username,
+    //            String pwd,
+    //            String baseUrl,
+    //            String compatibleMode) {
+    //        return load(baseUrl, classLoader)
+    //                .createCatalog(
+    //                        classLoader,
+    //                        catalogName,
+    //                        defaultDatabase,
+    //                        username,
+    //                        pwd,
+    //                        baseUrl,
+    //                        compatibleMode);
+    //    }
+
+    private static JdbcFactory load(String url, ClassLoader classLoader) {
+        List<JdbcFactory> foundFactories = discoverFactories(classLoader);
 
         if (foundFactories.isEmpty()) {
             throw new IllegalStateException(
                     String.format(
-                            "Could not find any jdbc dialect factories that 
implement '%s' in the classpath.",
-                            JdbcDialectFactory.class.getName()));
+                            "Could not find any jdbc factories that implement 
'%s' in the classpath.",
+                            JdbcFactory.class.getName()));
         }
 
-        final List<JdbcDialectFactory> matchingFactories =
+        final List<JdbcFactory> matchingFactories =
                 foundFactories.stream().filter(f -> 
f.acceptsURL(url)).collect(Collectors.toList());
 
         if (matchingFactories.isEmpty()) {
             throw new IllegalStateException(
                     String.format(
-                            "Could not find any jdbc dialect factory that can 
handle url '%s' that implements '%s' in the classpath.\n\n"
+                            "Could not find any jdbc factory that can handle 
url '%s' that implements '%s' in the classpath.\n\n"
                                     + "Available factories are:\n\n"
                                     + "%s",
                             url,
-                            JdbcDialectFactory.class.getName(),
+                            JdbcFactory.class.getName(),
                             foundFactories.stream()
                                     .map(f -> f.getClass().getName())
                                     .distinct()
@@ -81,31 +119,30 @@ public final class JdbcDialectLoader {
         if (matchingFactories.size() > 1) {
             throw new IllegalStateException(
                     String.format(
-                            "Multiple jdbc dialect factories can handle url 
'%s' that implement '%s' found in the classpath.\n\n"
+                            "Multiple jdbc factories can handle url '%s' that 
implement '%s' found in the classpath.\n\n"
                                     + "Ambiguous factory classes are:\n\n"
                                     + "%s",
                             url,
-                            JdbcDialectFactory.class.getName(),
+                            JdbcFactory.class.getName(),
                             matchingFactories.stream()
                                     .map(f -> f.getClass().getName())
                                     .sorted()
                                     .collect(Collectors.joining("\n"))));
         }
 
-        return matchingFactories.get(0).create(compatibleMode);
+        return matchingFactories.get(0);
     }
 
-    private static List<JdbcDialectFactory> discoverFactories(ClassLoader 
classLoader) {
+    private static List<JdbcFactory> discoverFactories(ClassLoader 
classLoader) {
         try {
-            final List<JdbcDialectFactory> result = new LinkedList<>();
-            ServiceLoader.load(JdbcDialectFactory.class, classLoader)
+            final List<JdbcFactory> result = new LinkedList<>();
+            ServiceLoader.load(JdbcFactory.class, classLoader)
                     .iterator()
                     .forEachRemaining(result::add);
             return result;
         } catch (ServiceConfigurationError e) {
-            LOG.error("Could not load service provider for jdbc dialects 
factory.", e);
-            throw new RuntimeException(
-                    "Could not load service provider for jdbc dialects 
factory.", e);
+            LOG.error("Could not load service provider for jdbc factory.", e);
+            throw new RuntimeException("Could not load service provider for 
jdbc factory.", e);
         }
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialect.java
similarity index 99%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialect.java
index d838ebe6..29b58427 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialect.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.dialect;
+package org.apache.flink.connector.jdbc.core.table.dialect;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialectConverter.java
similarity index 98%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialectConverter.java
index 1a19d127..95d9bcdd 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialectConverter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.converter;
+package org.apache.flink.connector.jdbc.core.table.dialect;
 
 import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
 import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
@@ -47,7 +47,7 @@ import java.time.LocalTime;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Base class for all converters that convert between JDBC object and Flink 
internal object. */
-public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
+public abstract class AbstractDialectConverter implements JdbcDialectConverter 
{
 
     protected final RowType rowType;
     protected final JdbcDeserializationConverter[] toInternalConverters;
@@ -56,7 +56,7 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
 
     public abstract String converterName();
 
-    public AbstractJdbcRowConverter(RowType rowType) {
+    public AbstractDialectConverter(RowType rowType) {
         this.rowType = checkNotNull(rowType);
         this.fieldTypes =
                 rowType.getFields().stream()
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialect.java
similarity index 96%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialect.java
index 6cc6bbd5..b748d308 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialect.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.dialect;
+package org.apache.flink.connector.jdbc.core.table.dialect;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -30,7 +30,7 @@ import java.util.Optional;
  * Represents a dialect of SQL implemented by a particular JDBC system. 
Dialects should be immutable
  * and stateless.
  *
- * @see JdbcDialectFactory
+ * @see JdbcFactory
  */
 @PublicEvolving
 public interface JdbcDialect extends Serializable {
@@ -48,7 +48,7 @@ public interface JdbcDialect extends Serializable {
      * @param rowType the given row type
      * @return a row converter for the database
      */
-    JdbcRowConverter getRowConverter(RowType rowType);
+    JdbcDialectConverter getRowConverter(RowType rowType);
 
     /**
      * Get limit clause to limit the number of emitted row from the jdbc 
source.
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialectConverter.java
similarity index 93%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialectConverter.java
index c1f1b1f5..77bb961e 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialectConverter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.converter;
+package org.apache.flink.connector.jdbc.core.table.dialect;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
@@ -31,7 +31,7 @@ import java.sql.SQLException;
  * structure {@link RowData}.
  */
 @PublicEvolving
-public interface JdbcRowConverter extends Serializable {
+public interface JdbcDialectConverter extends Serializable {
 
     /**
      * Convert data retrieved from {@link ResultSet} to internal {@link 
RowData}.
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java
index 7592cf2e..c290917b 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java
@@ -34,8 +34,8 @@ public class CrateDBDialect extends 
AbstractPostgresCompatibleDialect {
     }
 
     @Override
-    public CrateDBRowConverter getRowConverter(RowType rowType) {
-        return new CrateDBRowConverter(rowType);
+    public CrateDBDialectConverter getRowConverter(RowType rowType) {
+        return new CrateDBDialectConverter(rowType);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectConverter.java
similarity index 87%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectConverter.java
index 1412b0d5..80f9db25 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectConverter.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.databases.cratedb.dialect;
 
-import 
org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
+import 
org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleDialectConverter;
 import org.apache.flink.table.types.logical.RowType;
 
 import io.crate.shade.org.postgresql.jdbc.PgArray;
@@ -27,11 +27,11 @@ import io.crate.shade.org.postgresql.jdbc.PgArray;
  * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
  * CrateDB.
  */
-public class CrateDBRowConverter extends 
AbstractPostgresCompatibleRowConverter<PgArray> {
+public class CrateDBDialectConverter extends 
AbstractPostgresCompatibleDialectConverter<PgArray> {
 
     private static final long serialVersionUID = 1L;
 
-    public CrateDBRowConverter(RowType rowType) {
+    public CrateDBDialectConverter(RowType rowType) {
         super(rowType);
     }
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBFactory.java
similarity index 83%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBFactory.java
index ac5a8e7d..fcf31b24 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBFactory.java
@@ -19,19 +19,19 @@
 package org.apache.flink.connector.jdbc.databases.cratedb.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 /** Factory for {@link CrateDBDialect}. */
 @Internal
-public class CrateDBDialectFactory implements JdbcDialectFactory {
+public class CrateDBFactory implements JdbcFactory {
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:crate:");
     }
 
     @Override
-    public JdbcDialect create() {
+    public JdbcDialect createDialect() {
         return new CrateDBDialect();
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
index e617a9fc..49f71c5b 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.databases.db2.dialect;
 
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
-import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -42,8 +42,8 @@ public class Db2Dialect extends AbstractDialect {
     private static final int MIN_DECIMAL_PRECISION = 1;
 
     @Override
-    public JdbcRowConverter getRowConverter(RowType rowType) {
-        return new Db2RowConverter(rowType);
+    public JdbcDialectConverter getRowConverter(RowType rowType) {
+        return new Db2DialectConverter(rowType);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2RowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectConverter.java
similarity index 87%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2RowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectConverter.java
index 8557c822..4ad3fc49 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2RowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectConverter.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.databases.db2.dialect;
 
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -26,7 +26,7 @@ import org.apache.flink.table.types.logical.RowType;
  * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
  * Db2.
  */
-public class Db2RowConverter extends AbstractJdbcRowConverter {
+public class Db2DialectConverter extends AbstractDialectConverter {
 
     private static final long serialVersionUID = 1L;
 
@@ -35,7 +35,7 @@ public class Db2RowConverter extends AbstractJdbcRowConverter 
{
         return "Db2";
     }
 
-    public Db2RowConverter(RowType rowType) {
+    public Db2DialectConverter(RowType rowType) {
         super(rowType);
     }
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Factory.java
similarity index 83%
copy from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
copy to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Factory.java
index c3a21829..268a20c2 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Factory.java
@@ -19,19 +19,19 @@
 package org.apache.flink.connector.jdbc.databases.db2.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 /** Factory for {@link Db2Dialect}. */
 @Internal
-public class Db2DialectFactory implements JdbcDialectFactory {
+public class Db2Factory implements JdbcFactory {
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:db2:");
     }
 
     @Override
-    public JdbcDialect create() {
+    public JdbcDialect createDialect() {
         return new Db2Dialect();
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java
index dad028f0..93c6c325 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java
@@ -19,8 +19,8 @@
 package org.apache.flink.connector.jdbc.databases.derby.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
-import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -45,8 +45,8 @@ public class DerbyDialect extends AbstractDialect {
     private static final int MIN_DECIMAL_PRECISION = 1;
 
     @Override
-    public JdbcRowConverter getRowConverter(RowType rowType) {
-        return new DerbyRowConverter(rowType);
+    public JdbcDialectConverter getRowConverter(RowType rowType) {
+        return new DerbyDialectConverter(rowType);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectConverter.java
similarity index 86%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectConverter.java
index 7f05a303..846f2c16 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectConverter.java
@@ -19,7 +19,7 @@
 package org.apache.flink.connector.jdbc.databases.derby.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
 import org.apache.flink.table.types.logical.RowType;
 
 /**
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.RowType;
  * Derby.
  */
 @Internal
-public class DerbyRowConverter extends AbstractJdbcRowConverter {
+public class DerbyDialectConverter extends AbstractDialectConverter {
 
     private static final long serialVersionUID = 1L;
 
@@ -36,7 +36,7 @@ public class DerbyRowConverter extends 
AbstractJdbcRowConverter {
         return "Derby";
     }
 
-    public DerbyRowConverter(RowType rowType) {
+    public DerbyDialectConverter(RowType rowType) {
         super(rowType);
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyFactory.java
similarity index 83%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectFactory.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyFactory.java
index 280038dd..639d8695 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyFactory.java
@@ -19,19 +19,19 @@
 package org.apache.flink.connector.jdbc.databases.derby.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 /** Factory for {@link DerbyDialect}. */
 @Internal
-public class DerbyDialectFactory implements JdbcDialectFactory {
+public class DerbyFactory implements JdbcFactory {
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:derby:");
     }
 
     @Override
-    public JdbcDialect create() {
+    public JdbcDialect createDialect() {
         return new DerbyDialect();
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlTypeMapper.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlTypeMapper.java
index 995e0912..235136dd 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlTypeMapper.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlTypeMapper.java
@@ -120,7 +120,8 @@ public class MySqlTypeMapper implements 
JdbcDialectTypeMapper {
                 // VARBINARY(n) is not supported in MySqlDialect when 'n' is 
not equals to
                 // Integer.MAX_VALUE. Please see
                 // 
org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect#supportedTypes
 and
-                // 
org.apache.flink.connector.jdbc.dialect.AbstractDialect#validate for more
+                // 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect#validate for
+                // more
                 // details.
                 return DataTypes.BYTES();
             case MYSQL_TINYINT:
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLDialectConverter.java
similarity index 86%
copy from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java
copy to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLDialectConverter.java
index ef5d0fd7..c9c883f8 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLDialectConverter.java
@@ -19,7 +19,7 @@
 package org.apache.flink.connector.jdbc.databases.mysql.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
 import org.apache.flink.table.types.logical.RowType;
 
 /**
@@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.RowType;
  * MySQL.
  */
 @Internal
-public class MySQLRowConverter extends AbstractJdbcRowConverter {
+public class MySQLDialectConverter extends AbstractDialectConverter {
 
     private static final long serialVersionUID = 1L;
 
@@ -36,7 +36,7 @@ public class MySQLRowConverter extends 
AbstractJdbcRowConverter {
         return "MySQL";
     }
 
-    public MySQLRowConverter(RowType rowType) {
+    public MySQLDialectConverter(RowType rowType) {
         super(rowType);
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java
index 6d79d300..ba25e0eb 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java
@@ -19,8 +19,8 @@
 package org.apache.flink.connector.jdbc.databases.mysql.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
-import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -51,8 +51,8 @@ public class MySqlDialect extends AbstractDialect {
     private static final String REWRITE_BATCHED_STATEMENTS = 
"rewriteBatchedStatements";
 
     @Override
-    public JdbcRowConverter getRowConverter(RowType rowType) {
-        return new MySQLRowConverter(rowType);
+    public JdbcDialectConverter getRowConverter(RowType rowType) {
+        return new MySQLDialectConverter(rowType);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlFactory.java
similarity index 83%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialectFactory.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlFactory.java
index 71ae04cc..a3544a01 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlFactory.java
@@ -19,19 +19,19 @@
 package org.apache.flink.connector.jdbc.databases.mysql.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 /** Factory for {@link MySqlDialect}. */
 @Internal
-public class MySqlDialectFactory implements JdbcDialectFactory {
+public class MySqlFactory implements JdbcFactory {
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:mysql:");
     }
 
     @Override
-    public JdbcDialect create() {
+    public JdbcDialect createDialect() {
         return new MySqlDialect();
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java
index 885d3392..d190ea6a 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java
@@ -19,10 +19,10 @@
 package org.apache.flink.connector.jdbc.databases.oceanbase.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
 import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
-import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -84,8 +84,8 @@ public class OceanBaseDialect extends AbstractDialect {
     }
 
     @Override
-    public JdbcRowConverter getRowConverter(RowType rowType) {
-        return new OceanBaseRowConverter(rowType);
+    public JdbcDialectConverter getRowConverter(RowType rowType) {
+        return new OceanBaseDialectConverter(rowType);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectConverter.java
similarity index 96%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectConverter.java
index 95981217..1bc66254 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectConverter.java
@@ -19,7 +19,7 @@
 package org.apache.flink.connector.jdbc.databases.oceanbase.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
@@ -40,7 +40,7 @@ import java.time.LocalDateTime;
  * OceanBase.
  */
 @Internal
-public class OceanBaseRowConverter extends AbstractJdbcRowConverter {
+public class OceanBaseDialectConverter extends AbstractDialectConverter {
 
     private static final long serialVersionUID = 1L;
 
@@ -49,7 +49,7 @@ public class OceanBaseRowConverter extends 
AbstractJdbcRowConverter {
         return "OceanBase";
     }
 
-    public OceanBaseRowConverter(RowType rowType) {
+    public OceanBaseDialectConverter(RowType rowType) {
         super(rowType);
     }
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseFactory.java
similarity index 81%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectFactory.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseFactory.java
index 46152379..1455e310 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseFactory.java
@@ -19,14 +19,14 @@
 package org.apache.flink.connector.jdbc.databases.oceanbase.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 import javax.annotation.Nonnull;
 
 /** Factory for {@link OceanBaseDialect}. */
 @Internal
-public class OceanBaseDialectFactory implements JdbcDialectFactory {
+public class OceanBaseFactory implements JdbcFactory {
 
     @Override
     public boolean acceptsURL(String url) {
@@ -34,13 +34,13 @@ public class OceanBaseDialectFactory implements 
JdbcDialectFactory {
     }
 
     @Override
-    public JdbcDialect create() {
+    public JdbcDialect createDialect() {
         throw new UnsupportedOperationException(
                 "Can't create JdbcDialect without compatible mode for 
OceanBase");
     }
 
     @Override
-    public JdbcDialect create(@Nonnull String compatibleMode) {
+    public JdbcDialect createDialect(@Nonnull String compatibleMode) {
         return new OceanBaseDialect(compatibleMode);
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java
index 1ed061fe..d15a4a5b 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java
@@ -19,8 +19,8 @@
 package org.apache.flink.connector.jdbc.databases.oracle.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
-import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -47,8 +47,8 @@ public class OracleDialect extends AbstractDialect {
     private static final int MIN_DECIMAL_PRECISION = 1;
 
     @Override
-    public JdbcRowConverter getRowConverter(RowType rowType) {
-        return new OracleRowConverter(rowType);
+    public JdbcDialectConverter getRowConverter(RowType rowType) {
+        return new OracleDialectConverter(rowType);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectConverter.java
similarity index 97%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectConverter.java
index b476e661..4acc8f97 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectConverter.java
@@ -19,7 +19,7 @@
 package org.apache.flink.connector.jdbc.databases.oracle.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
@@ -50,11 +50,11 @@ import java.time.ZonedDateTime;
  * Oracle.
  */
 @Internal
-public class OracleRowConverter extends AbstractJdbcRowConverter {
+public class OracleDialectConverter extends AbstractDialectConverter {
 
     private static final long serialVersionUID = 1L;
 
-    public OracleRowConverter(RowType rowType) {
+    public OracleDialectConverter(RowType rowType) {
         super(rowType);
     }
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleFactory.java
similarity index 83%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectFactory.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleFactory.java
index b7bc60ea..78f124b1 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleFactory.java
@@ -19,19 +19,19 @@
 package org.apache.flink.connector.jdbc.databases.oracle.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 /** Factory for {@link OracleDialect}. */
 @Internal
-public class OracleDialectFactory implements JdbcDialectFactory {
+public class OracleFactory implements JdbcFactory {
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:oracle:");
     }
 
     @Override
-    public JdbcDialect create() {
+    public JdbcDialect createDialect() {
         return new OracleDialect();
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
index d0924aee..6973e597 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
@@ -31,8 +31,8 @@ public class PostgresDialect extends 
AbstractPostgresCompatibleDialect {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public PostgresRowConverter getRowConverter(RowType rowType) {
-        return new PostgresRowConverter(rowType);
+    public PostgresDialectConverter getRowConverter(RowType rowType) {
+        return new PostgresDialectConverter(rowType);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectConverter.java
similarity index 87%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectConverter.java
index 2e6e14c2..7ac579a0 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectConverter.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.databases.postgres.dialect;
 
-import 
org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
+import 
org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleDialectConverter;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.postgresql.jdbc.PgArray;
@@ -27,11 +27,11 @@ import org.postgresql.jdbc.PgArray;
  * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
  * PostgreSQL.
  */
-public class PostgresRowConverter extends 
AbstractPostgresCompatibleRowConverter<PgArray> {
+public class PostgresDialectConverter extends 
AbstractPostgresCompatibleDialectConverter<PgArray> {
 
     private static final long serialVersionUID = 1L;
 
-    public PostgresRowConverter(RowType rowType) {
+    public PostgresDialectConverter(RowType rowType) {
         super(rowType);
     }
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresFactory.java
similarity index 83%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectFactory.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresFactory.java
index 6286f144..5fe6df8f 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresFactory.java
@@ -19,19 +19,19 @@
 package org.apache.flink.connector.jdbc.databases.postgres.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 /** Factory for {@link PostgresDialect}. */
 @Internal
-public class PostgresDialectFactory implements JdbcDialectFactory {
+public class PostgresFactory implements JdbcFactory {
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:postgresql:");
     }
 
     @Override
-    public JdbcDialect create() {
+    public JdbcDialect createDialect() {
         return new PostgresDialect();
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java
index 91469a6b..f19e6f89 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java
@@ -19,8 +19,8 @@
 package org.apache.flink.connector.jdbc.databases.sqlserver.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
-import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -126,8 +126,8 @@ public class SqlServerDialect extends AbstractDialect {
     }
 
     @Override
-    public JdbcRowConverter getRowConverter(RowType rowType) {
-        return new SqlServerRowConverter(rowType);
+    public JdbcDialectConverter getRowConverter(RowType rowType) {
+        return new SqlServerDialectConverter(rowType);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectConverter.java
similarity index 88%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectConverter.java
index 5f3ac2b4..9c7451b0 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectConverter.java
@@ -19,7 +19,7 @@
 package org.apache.flink.connector.jdbc.databases.sqlserver.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -28,7 +28,7 @@ import org.apache.flink.table.types.logical.RowType;
  * MsSql.
  */
 @Internal
-public class SqlServerRowConverter extends AbstractJdbcRowConverter {
+public class SqlServerDialectConverter extends AbstractDialectConverter {
 
     private static final long serialVersionUID = 1L;
 
@@ -37,7 +37,7 @@ public class SqlServerRowConverter extends 
AbstractJdbcRowConverter {
         return "SqlServer";
     }
 
-    public SqlServerRowConverter(RowType rowType) {
+    public SqlServerDialectConverter(RowType rowType) {
         super(rowType);
     }
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerFactory.java
similarity index 83%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectFactory.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerFactory.java
index 9b6ee18c..695cca72 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerFactory.java
@@ -19,19 +19,19 @@
 package org.apache.flink.connector.jdbc.databases.sqlserver.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 /** Factory for {@link SqlServerDialect}. */
 @Internal
-public class SqlServerDialectFactory implements JdbcDialectFactory {
+public class SqlServerFactory implements JdbcFactory {
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:sqlserver:");
     }
 
     @Override
-    public JdbcDialect create() {
+    public JdbcDialect createDialect() {
         return new SqlServerDialect();
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java
index 041d24c3..5c02dda0 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java
@@ -19,8 +19,8 @@
 package org.apache.flink.connector.jdbc.databases.trino.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
-import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -45,8 +45,8 @@ public class TrinoDialect extends AbstractDialect {
     private static final int MIN_DECIMAL_PRECISION = 1;
 
     @Override
-    public JdbcRowConverter getRowConverter(RowType rowType) {
-        return new TrinoRowConverter(rowType);
+    public JdbcDialectConverter getRowConverter(RowType rowType) {
+        return new TrinoDialectConverter(rowType);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoRowConverter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectConverter.java
similarity index 91%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoRowConverter.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectConverter.java
index 92a839e9..a37d6e90 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoRowConverter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectConverter.java
@@ -19,7 +19,7 @@
 package org.apache.flink.connector.jdbc.databases.trino.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -32,11 +32,11 @@ import java.math.BigDecimal;
  * Trino.
  */
 @Internal
-public class TrinoRowConverter extends AbstractJdbcRowConverter {
+public class TrinoDialectConverter extends AbstractDialectConverter {
 
     private static final long serialVersionUID = 1L;
 
-    public TrinoRowConverter(RowType rowType) {
+    public TrinoDialectConverter(RowType rowType) {
         super(rowType);
     }
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoFactory.java
similarity index 83%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectFactory.java
rename to 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoFactory.java
index a0e2edda..9283b8b4 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoFactory.java
@@ -19,19 +19,19 @@
 package org.apache.flink.connector.jdbc.databases.trino.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 /** Factory for {@link TrinoDialect}. */
 @Internal
-public class TrinoDialectFactory implements JdbcDialectFactory {
+public class TrinoFactory implements JdbcFactory {
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:trino:");
     }
 
     @Override
-    public JdbcDialect create() {
+    public JdbcDialect createDialect() {
         return new TrinoDialect();
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java
index 0ca425f3..2845c75b 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
+import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import java.util.Arrays;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
index 0842177f..17a25c6f 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java
@@ -18,7 +18,7 @@
 package org.apache.flink.connector.jdbc.internal.executor;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
 import org.apache.flink.connector.jdbc.statement.StatementFactory;
 import org.apache.flink.table.data.RowData;
@@ -43,9 +43,9 @@ public final class TableInsertOrUpdateStatementExecutor
     private final StatementFactory insertStmtFactory;
     private final StatementFactory updateStmtFactory;
 
-    private final JdbcRowConverter existSetter;
-    private final JdbcRowConverter insertSetter;
-    private final JdbcRowConverter updateSetter;
+    private final JdbcDialectConverter existSetter;
+    private final JdbcDialectConverter insertSetter;
+    private final JdbcDialectConverter updateSetter;
 
     private final Function<RowData, RowData> keyExtractor;
 
@@ -57,9 +57,9 @@ public final class TableInsertOrUpdateStatementExecutor
             StatementFactory existStmtFactory,
             StatementFactory insertStmtFactory,
             StatementFactory updateStmtFactory,
-            JdbcRowConverter existSetter,
-            JdbcRowConverter insertSetter,
-            JdbcRowConverter updateSetter,
+            JdbcDialectConverter existSetter,
+            JdbcDialectConverter insertSetter,
+            JdbcDialectConverter updateSetter,
             Function<RowData, RowData> keyExtractor) {
         this.existStmtFactory = checkNotNull(existStmtFactory);
         this.insertStmtFactory = checkNotNull(insertStmtFactory);
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
index d7d74ba2..59866b51 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.internal.executor;
 
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
 import org.apache.flink.connector.jdbc.statement.StatementFactory;
 import org.apache.flink.table.data.RowData;
@@ -35,7 +35,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public final class TableSimpleStatementExecutor implements 
JdbcBatchStatementExecutor<RowData> {
 
     private final StatementFactory stmtFactory;
-    private final JdbcRowConverter converter;
+    private final JdbcDialectConverter converter;
 
     private transient FieldNamedPreparedStatement st;
 
@@ -43,7 +43,8 @@ public final class TableSimpleStatementExecutor implements 
JdbcBatchStatementExe
      * Keep in mind object reuse: if it's on then key extractor may be 
required to return new
      * object.
      */
-    public TableSimpleStatementExecutor(StatementFactory stmtFactory, 
JdbcRowConverter converter) {
+    public TableSimpleStatementExecutor(
+            StatementFactory stmtFactory, JdbcDialectConverter converter) {
         this.stmtFactory = checkNotNull(stmtFactory);
         this.converter = checkNotNull(converter);
     }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java
index 2083b437..753e5cab 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java
@@ -20,8 +20,8 @@ package org.apache.flink.connector.jdbc.internal.options;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -179,7 +179,7 @@ public class InternalJdbcConnectionOptions extends 
JdbcConnectionOptions {
 
         /**
          * optional, Handle the SQL dialect of jdbc driver. If not set, it 
will be infer by {@link
-         * JdbcDialectLoader#load} from DB url.
+         * JdbcFactoryLoader#load} from DB url.
          */
         public Builder setDialect(JdbcDialect dialect) {
             this.dialect = dialect;
@@ -205,7 +205,7 @@ public class InternalJdbcConnectionOptions extends 
JdbcConnectionOptions {
                 if (classLoader == null) {
                     classLoader = 
Thread.currentThread().getContextClassLoader();
                 }
-                this.dialect = JdbcDialectLoader.load(dbURL, compatibleMode, 
classLoader);
+                this.dialect = JdbcFactoryLoader.loadDialect(dbURL, 
classLoader, compatibleMode);
             }
             if (this.driverName == null) {
                 Optional<String> optional = dialect.defaultDriverName();
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java
index a889a679..776fe431 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.connector.jdbc.internal.options;
 
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index 58363a52..17ba340d 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
@@ -131,7 +131,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
 
     private static void validateDataTypeWithJdbcDialect(
             DataType dataType, String url, String compatibleMode, ClassLoader 
classLoader) {
-        final JdbcDialect dialect = JdbcDialectLoader.load(url, 
compatibleMode, classLoader);
+        final JdbcDialect dialect = JdbcFactoryLoader.loadDialect(url, 
classLoader, compatibleMode);
         dialect.validate((RowType) dataType.getLogicalType());
     }
 
@@ -144,8 +144,8 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                         .setDBUrl(url)
                         .setTableName(readableConfig.get(TABLE_NAME))
                         .setDialect(
-                                JdbcDialectLoader.load(
-                                        url, 
readableConfig.get(COMPATIBLE_MODE), classLoader))
+                                JdbcFactoryLoader.loadDialect(
+                                        url, classLoader, 
readableConfig.get(COMPATIBLE_MODE)))
                         
.setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null))
                         .setConnectionCheckTimeoutSeconds(
                                 (int) 
readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
@@ -283,7 +283,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
 
     private void validateConfigOptions(ReadableConfig config, ClassLoader 
classLoader) {
         String jdbcUrl = config.get(URL);
-        JdbcDialectLoader.load(jdbcUrl, config.get(COMPATIBLE_MODE), 
classLoader);
+        JdbcFactoryLoader.loadDialect(jdbcUrl, classLoader, 
config.get(COMPATIBLE_MODE));
 
         checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index c8ef2e33..bc2c1732 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -19,7 +19,7 @@
 package org.apache.flink.connector.jdbc.table;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
 import 
org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java
index 745d2c02..554aaaf2 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java
@@ -19,9 +19,9 @@
 package org.apache.flink.connector.jdbc.table;
 
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import 
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
@@ -174,7 +174,7 @@ public class JdbcOutputFormatBuilder implements 
Serializable {
 
     private static JdbcBatchStatementExecutor<RowData> createSimpleRowExecutor(
             JdbcDialect dialect, String[] fieldNames, LogicalType[] 
fieldTypes, final String sql) {
-        final JdbcRowConverter rowConverter = 
dialect.getRowConverter(RowType.of(fieldTypes));
+        final JdbcDialectConverter rowConverter = 
dialect.getRowConverter(RowType.of(fieldTypes));
         return new TableSimpleStatementExecutor(
                 connection ->
                         
FieldNamedPreparedStatement.prepareStatement(connection, sql, fieldNames),
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
index 7e5dc2a3..a4990a73 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
@@ -67,7 +67,7 @@ public class JdbcRowDataInputFormat extends 
RichInputFormat<RowData, InputSplit>
     private String queryTemplate;
     private int resultSetType;
     private int resultSetConcurrency;
-    private JdbcRowConverter rowConverter;
+    private JdbcDialectConverter rowConverter;
     private TypeInformation<RowData> rowDataTypeInfo;
 
     private transient PreparedStatement statement;
@@ -82,7 +82,7 @@ public class JdbcRowDataInputFormat extends 
RichInputFormat<RowData, InputSplit>
             String queryTemplate,
             int resultSetType,
             int resultSetConcurrency,
-            JdbcRowConverter rowConverter,
+            JdbcDialectConverter rowConverter,
             TypeInformation<RowData> rowDataTypeInfo) {
         this.connectionProvider = connectionProvider;
         this.fetchSize = fetchSize;
@@ -303,7 +303,7 @@ public class JdbcRowDataInputFormat extends 
RichInputFormat<RowData, InputSplit>
         private Boolean autoCommit;
         private Object[][] parameterValues;
         private String queryTemplate;
-        private JdbcRowConverter rowConverter;
+        private JdbcDialectConverter rowConverter;
         private TypeInformation<RowData> rowDataTypeInfo;
         private int resultSetType = ResultSet.TYPE_FORWARD_ONLY;
         private int resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
@@ -347,7 +347,7 @@ public class JdbcRowDataInputFormat extends 
RichInputFormat<RowData, InputSplit>
             return this;
         }
 
-        public Builder setRowConverter(JdbcRowConverter rowConverter) {
+        public Builder setRowConverter(JdbcDialectConverter rowConverter) {
             this.rowConverter = rowConverter;
             return this;
         }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
index 23ddf3c7..d3e95b31 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
@@ -20,10 +20,10 @@ package org.apache.flink.connector.jdbc.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import 
org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
 import org.apache.flink.table.data.RowData;
@@ -62,8 +62,8 @@ public class JdbcRowDataLookupFunction extends LookupFunction 
{
     private final JdbcConnectionProvider connectionProvider;
     private final String[] keyNames;
     private final int maxRetryTimes;
-    private final JdbcRowConverter jdbcRowConverter;
-    private final JdbcRowConverter lookupKeyRowConverter;
+    private final JdbcDialectConverter jdbcDialectConverter;
+    private final JdbcDialectConverter lookupKeyRowConverter;
 
     private final List<String> resolvedPredicates;
     private final Serializable[] pushdownParams;
@@ -105,7 +105,7 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
                 options.getDialect()
                         .getSelectFromStatement(options.getTableName(), 
fieldNames, keyNames);
         JdbcDialect jdbcDialect = options.getDialect();
-        this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType);
+        this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType);
         this.lookupKeyRowConverter =
                 jdbcDialect.getRowConverter(
                         RowType.of(
@@ -151,7 +151,7 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
                 try (ResultSet resultSet = statement.executeQuery()) {
                     ArrayList<RowData> rows = new ArrayList<>();
                     while (resultSet.next()) {
-                        RowData row = jdbcRowConverter.toInternal(resultSet);
+                        RowData row = 
jdbcDialectConverter.toInternal(resultSet);
                         rows.add(row);
                     }
                     rows.trimToSize();
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
index 6a191f5a..3fc1e422 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.table;
 
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
@@ -29,14 +29,14 @@ import java.sql.SQLException;
 /** The result extractor for {@link RowData}. */
 public class RowDataResultExtractor implements ResultExtractor<RowData> {
 
-    private final JdbcRowConverter jdbcRowConverter;
+    private final JdbcDialectConverter jdbcDialectConverter;
 
-    public RowDataResultExtractor(JdbcRowConverter jdbcRowConverter) {
-        this.jdbcRowConverter = Preconditions.checkNotNull(jdbcRowConverter);
+    public RowDataResultExtractor(JdbcDialectConverter jdbcDialectConverter) {
+        this.jdbcDialectConverter = 
Preconditions.checkNotNull(jdbcDialectConverter);
     }
 
     @Override
     public RowData extract(ResultSet resultSet) throws SQLException {
-        return jdbcRowConverter.toInternal(resultSet);
+        return jdbcDialectConverter.toInternal(resultSet);
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
 
b/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.table.JdbcFactory
similarity index 69%
rename from 
flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
rename to 
flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.table.JdbcFactory
index 9a3cf26c..13eecd1a 100644
--- 
a/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
+++ 
b/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.table.JdbcFactory
@@ -13,12 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.connector.jdbc.databases.derby.dialect.DerbyDialectFactory
-org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialectFactory
-org.apache.flink.connector.jdbc.databases.oceanbase.dialect.OceanBaseDialectFactory
-org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialectFactory
-org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialectFactory
-org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFactory
-org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory
-org.apache.flink.connector.jdbc.databases.db2.dialect.Db2DialectFactory
-org.apache.flink.connector.jdbc.databases.trino.dialect.TrinoDialectFactory
+org.apache.flink.connector.jdbc.databases.derby.dialect.DerbyFactory
+org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlFactory
+org.apache.flink.connector.jdbc.databases.oceanbase.dialect.OceanBaseFactory
+org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresFactory
+org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleFactory
+org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerFactory
+org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBFactory
+org.apache.flink.connector.jdbc.databases.db2.dialect.Db2Factory
+org.apache.flink.connector.jdbc.databases.trino.dialect.TrinoFactory
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractDialectConverterTest.java
similarity index 85%
rename from 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java
rename to 
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractDialectConverterTest.java
index 9f9fee1e..bfb77392 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractDialectConverterTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.jdbc.converter;
 
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -31,14 +33,14 @@ import java.time.LocalDateTime;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link AbstractJdbcRowConverter}. */
-class AbstractJdbcRowConverterTest {
+/** Test for {@link AbstractDialectConverter}. */
+class AbstractDialectConverterTest {
 
     @Test
     void testExternalLocalDateTimeToTimestamp() throws Exception {
         RowType rowType = RowType.of(new IntType(), new TimestampType(3));
-        JdbcRowConverter rowConverter =
-                new AbstractJdbcRowConverter(rowType) {
+        JdbcDialectConverter rowConverter =
+                new AbstractDialectConverter(rowType) {
 
                     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java
index 162c3bc1..8a8ea588 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.databases.db2.dialect;
 
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 import org.junit.jupiter.api.Test;
 
@@ -29,7 +29,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 class Db2PreparedStatementTest {
 
     private final JdbcDialect dialect =
-            JdbcDialectLoader.load("jdbc:db2://localhost:3306/test", 
getClass().getClassLoader());
+            JdbcFactoryLoader.loadDialect(
+                    "jdbc:db2://localhost:3306/test", 
getClass().getClassLoader());
     private final String[] fieldNames =
             new String[] {"id", "name", "email", "ts", "field1", "field_2", 
"__field_3__"};
     private final String[] keyFields = new String[] {"id", "__field_3__"};
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java
index f69f6a33..372493b2 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.databases.oracle.dialect;
 
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 import 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
 
 import org.junit.jupiter.api.Test;
@@ -36,7 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 class OraclePreparedStatementTest {
 
     private final JdbcDialect dialect =
-            JdbcDialectLoader.load(
+            JdbcFactoryLoader.loadDialect(
                     "jdbc:oracle://localhost:3306/test", 
getClass().getClassLoader());
     private final String[] fieldNames =
             new String[] {"id", "name", "email", "ts", "field1", "field_2", 
"__field_3__"};
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java
index 33fc3ed3..fbe57835 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.databases.sqlserver.dialect;
 
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 import org.junit.jupiter.api.Test;
 
@@ -29,7 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 class SqlServerPreparedStatementTest {
 
     private final JdbcDialect dialect =
-            JdbcDialectLoader.load(
+            JdbcFactoryLoader.loadDialect(
                     "jdbc:sqlserver://localhost:3306/test", 
getClass().getClassLoader());
     private final String[] fieldNames =
             new String[] {"id", "name", "email", "ts", "field1", "field_2", 
"__field_3__"};
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoPreparedStatementTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoPreparedStatementTest.java
index 847f1a7f..8b3d9854 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoPreparedStatementTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoPreparedStatementTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.databases.trino.dialect;
 
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 import 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
 
 import org.junit.jupiter.api.Test;
@@ -37,7 +37,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 class TrinoPreparedStatementTest {
 
     private final JdbcDialect dialect =
-            JdbcDialectLoader.load("jdbc:trino://localhost:3306/test", 
getClass().getClassLoader());
+            JdbcFactoryLoader.loadDialect(
+                    "jdbc:trino://localhost:3306/test", 
getClass().getClassLoader());
     private final String[] fieldNames =
             new String[] {"id", "name", "email", "ts", "field1", "field_2", 
"__field_3__"};
     private final String[] keyFields = new String[] {"id", "__field_3__"};
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
index b1d5684e..ff969b7d 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.statement;
 
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 
 import org.junit.jupiter.api.Test;
 
@@ -35,7 +35,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 class FieldNamedPreparedStatementImplTest {
 
     private final JdbcDialect dialect =
-            JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", 
getClass().getClassLoader());
+            JdbcFactoryLoader.loadDialect(
+                    "jdbc:mysql://localhost:3306/test", 
getClass().getClassLoader());
     private final String[] fieldNames =
             new String[] {"id", "name", "email", "ts", "field1", "field_2", 
"__field_3__"};
     private final String[] keyFields = new String[] {"id", "__field_3__"};
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
index 7209dbfa..cad629f0 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.table;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
 import org.apache.flink.connector.jdbc.internal.RowJdbcOutputFormat;
@@ -62,7 +62,7 @@ class JdbcAppendOnlyWriterTest extends JdbcTestBase {
                                                     
InternalJdbcConnectionOptions.builder()
                                                             
.setDBUrl(getMetadata().getJdbcUrl())
                                                             .setDialect(
-                                                                    
JdbcDialectLoader.load(
+                                                                    
JdbcFactoryLoader.loadDialect(
                                                                             
getMetadata()
                                                                                
     .getJdbcUrl(),
                                                                             
getClass()
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
index ec08391d..147a254e 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.connector.jdbc.table;
 
 import org.apache.flink.connector.jdbc.JdbcTestBase;
-import 
org.apache.flink.connector.jdbc.databases.derby.dialect.DerbyDialectFactory;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.databases.derby.dialect.DerbyFactory;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
@@ -221,7 +221,7 @@ class JdbcFilterPushdownPreparedStatementVisitorTest {
             Serializable[] expectedParams) {
         List<ResolvedExpression> resolved = 
resolveSQLFilterToExpression(inputExpr, schema);
         assertThat(resolved.size()).isEqualTo(1);
-        JdbcDialect dialect = new DerbyDialectFactory().create();
+        JdbcDialect dialect = new DerbyFactory().createDialect();
         JdbcFilterPushdownPreparedStatementVisitor visitor =
                 new 
JdbcFilterPushdownPreparedStatementVisitor(dialect::quoteIdentifier);
         ParameterizedPredicate pred = resolved.get(0).accept(visitor).get();
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
index b2e4deea..3f8d19ea 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.table;
 
 import org.apache.flink.connector.jdbc.JdbcDataTestBase;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import 
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml
index 68d05f08..5e507f93 100644
--- a/flink-connector-jdbc/pom.xml
+++ b/flink-connector-jdbc/pom.xml
@@ -42,6 +42,22 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-annotations</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+
        </dependencies>
 
        <build>
@@ -60,7 +76,11 @@ under the License.
                                                        
<shadeTestJar>true</shadeTestJar>
                                                        <artifactSet>
                                                                <excludes>
+                                                                       
<exclude>org.apache.flink:flink-annotations</exclude>
+                                                                       
<exclude>org.apache.flink:flink-core</exclude>
                                                                        
<exclude>org.apache.flink:flink-connector-base</exclude>
+                                                                       
<exclude>org.apache.flink:flink-table-common</exclude>
+                                                                       
<exclude>org.apache.flink:flink-connector-jdbc-core:tests</exclude>
                                                                </excludes>
                                                        </artifactSet>
                                                        <filters 
combine.children="append">
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java
similarity index 61%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java
index ef5d0fd7..f45f0c09 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java
@@ -16,27 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.databases.mysql.dialect;
+package org.apache.flink.connector.jdbc.converter;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter;
 import org.apache.flink.table.types.logical.RowType;
 
 /**
- * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
- * MySQL.
- */
-@Internal
-public class MySQLRowConverter extends AbstractJdbcRowConverter {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public String converterName() {
-        return "MySQL";
-    }
-
-    public MySQLRowConverter(RowType rowType) {
+ * Base class for all converters that convert between JDBC object and Flink 
internal object.
+ *
+ * @deprecated use AbstractDialectConverter
+ * */
+@Deprecated
+public abstract class AbstractJdbcRowConverter extends 
AbstractDialectConverter {
+    public AbstractJdbcRowConverter(RowType rowType) {
         super(rowType);
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java
similarity index 59%
copy from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
copy to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java
index c3a21829..923fe222 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java
@@ -16,22 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.databases.db2.dialect;
+package org.apache.flink.connector.jdbc.converter;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter;
+import org.apache.flink.table.data.RowData;
 
-/** Factory for {@link Db2Dialect}. */
-@Internal
-public class Db2DialectFactory implements JdbcDialectFactory {
-    @Override
-    public boolean acceptsURL(String url) {
-        return url.startsWith("jdbc:db2:");
-    }
-
-    @Override
-    public JdbcDialect create() {
-        return new Db2Dialect();
-    }
-}
+/**
+ * Converter that is responsible to convert between JDBC object and Flink SQL 
internal data
+ * structure {@link RowData}.
+ *
+ * @deprecated use JdbcDialectConverter
+ */
+@Deprecated
+@PublicEvolving
+public interface JdbcRowConverter extends JdbcDialectConverter {}
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
new file mode 100644
index 00000000..28b2d8a2
--- /dev/null
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect;
+
+/**
+ * Base class for {@link JdbcDialect JdbcDialects} that implements basic data 
type validation and
+ * the construction of basic {@code INSERT}, {@code UPDATE}, {@code DELETE}, 
and {@code SELECT}
+ * statements.
+ *
+ * <p>Implementors should be careful to check the default SQL statements are 
performant for their
+ * specific dialect and override them if necessary.
+ *
+ * @deprecated use 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect
+ */
+@Deprecated
+@PublicEvolving
+public abstract class AbstractDialect
+        extends 
org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect
+        implements org.apache.flink.connector.jdbc.dialect.JdbcDialect {}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
similarity index 59%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
index c3a21829..54a61d4c 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
@@ -16,22 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.jdbc.databases.db2.dialect;
+package org.apache.flink.connector.jdbc.dialect;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
 
-/** Factory for {@link Db2Dialect}. */
-@Internal
-public class Db2DialectFactory implements JdbcDialectFactory {
-    @Override
-    public boolean acceptsURL(String url) {
-        return url.startsWith("jdbc:db2:");
-    }
-
-    @Override
-    public JdbcDialect create() {
-        return new Db2Dialect();
-    }
-}
+/**
+ * Represents a dialect of SQL implemented by a particular JDBC system. 
Dialects should be immutable
+ * and stateless.
+ *
+ * @see JdbcFactory
+ * @deprecated use 
org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect
+ */
+@Deprecated
+@PublicEvolving
+public interface JdbcDialect
+        extends org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect 
{}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
similarity index 50%
rename from 
flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
rename to 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
index 84f4bd78..3fc31c49 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
@@ -18,41 +18,34 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.core.table.JdbcFactory;
 import org.apache.flink.util.StringUtils;
 
 /**
- * A factory to create a specific {@link JdbcDialect}. This factory is used 
with Java's Service
- * Provider Interfaces (SPI) for discovering.
+ * A factory to create a specific {@link 
org.apache.flink.connector.jdbc.dialect.JdbcDialect}. This
+ * factory is used with Java's Service Provider Interfaces (SPI) for 
discovering.
  *
  * <p>Classes that implement this interface can be added to the
- * 
"META_INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory" 
file of a JAR file
- * in the current classpath to be found.
+ * "META_INF/services/org.apache.flink.connector.jdbc.JdbcDialectFactory" file 
of a JAR file in the
+ * current classpath to be found.
  *
- * @see JdbcDialect
+ * @see org.apache.flink.connector.jdbc.dialect.JdbcDialect
+ * @deprecated use JdbcFactory
  */
-@PublicEvolving
-public interface JdbcDialectFactory {
+@Deprecated
+public interface JdbcDialectFactory extends JdbcFactory {
 
     /**
-     * Retrieves whether the dialect thinks that it can open a connection to 
the given URL.
-     * Typically, dialects will return <code>true</code> if they understand 
the sub-protocol
-     * specified in the URL and <code>false</code> if they do not.
-     *
-     * @param url the URL of the database
-     * @return <code>true</code> if this dialect understands the given URL; 
<code>false</code>
-     *     otherwise.
+     * @deprecated
+     * @return Creates a new instance of the {@link
+     *     org.apache.flink.connector.jdbc.dialect.JdbcDialect}.
      */
-    boolean acceptsURL(String url);
-
-    /** @return Creates a new instance of the {@link JdbcDialect}. */
     JdbcDialect create();
 
     /**
-     * Creates a new instance of the {@link JdbcDialect} based on compatible 
mode.
-     *
+     * @deprecated
      * @param compatibleMode the compatible mode of database
-     * @return a new instance of {@link JdbcDialect}
+     * @return a new instance of {@link 
org.apache.flink.connector.jdbc.dialect.JdbcDialect}
      */
     default JdbcDialect create(String compatibleMode) {
         if (StringUtils.isNullOrWhitespaceOnly(compatibleMode)) {
@@ -61,4 +54,22 @@ public interface JdbcDialectFactory {
         throw new UnsupportedOperationException(
                 "Not supported option 'compatible-mode' with value: " + 
compatibleMode);
     }
+
+    /**
+     * @return a new instance of the {@link 
org.apache.flink.connector.jdbc.dialect.JdbcDialect}.
+     */
+    default JdbcDialect createDialect() {
+        return create();
+    }
+
+    /**
+     * Creates a new instance of the {@link 
org.apache.flink.connector.jdbc.dialect.JdbcDialect}
+     * based on compatible mode.
+     *
+     * @param compatibleMode the compatible mode of database
+     * @return a new instance of {@link 
org.apache.flink.connector.jdbc.dialect.JdbcDialect}
+     */
+    default JdbcDialect createDialect(String compatibleMode) {
+        return create(compatibleMode);
+    }
 }

Reply via email to