This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit a00115191ff34bfdfc4c8d3d8dfda25451c039b1 Author: Joao Boto <[email protected]> AuthorDate: Fri Apr 26 11:33:04 2024 +0200 [FLINK-35365] Reorganize dialect code --- .../connector/jdbc/catalog/JdbcCatalogUtils.java | 8 +- ...bstractPostgresCompatibleDialectConverter.java} | 7 +- .../table/JdbcFactory.java} | 25 +++++-- .../table/JdbcFactoryLoader.java} | 85 ++++++++++++++++------ .../{ => core/table}/dialect/AbstractDialect.java | 2 +- .../table/dialect/AbstractDialectConverter.java} | 6 +- .../jdbc/{ => core/table}/dialect/JdbcDialect.java | 8 +- .../table/dialect/JdbcDialectConverter.java} | 4 +- .../databases/cratedb/dialect/CrateDBDialect.java | 4 +- ...Converter.java => CrateDBDialectConverter.java} | 6 +- ...teDBDialectFactory.java => CrateDBFactory.java} | 8 +- .../jdbc/databases/db2/dialect/Db2Dialect.java | 8 +- ...2RowConverter.java => Db2DialectConverter.java} | 6 +- .../{Db2DialectFactory.java => Db2Factory.java} | 8 +- .../jdbc/databases/derby/dialect/DerbyDialect.java | 8 +- ...owConverter.java => DerbyDialectConverter.java} | 6 +- ...{DerbyDialectFactory.java => DerbyFactory.java} | 8 +- .../databases/mysql/catalog/MySqlTypeMapper.java | 3 +- ...owConverter.java => MySQLDialectConverter.java} | 6 +- .../jdbc/databases/mysql/dialect/MySqlDialect.java | 8 +- ...{MySqlDialectFactory.java => MySqlFactory.java} | 8 +- .../oceanbase/dialect/OceanBaseDialect.java | 8 +- ...nverter.java => OceanBaseDialectConverter.java} | 6 +- ...seDialectFactory.java => OceanBaseFactory.java} | 10 +-- .../databases/oracle/dialect/OracleDialect.java | 8 +- ...wConverter.java => OracleDialectConverter.java} | 6 +- ...racleDialectFactory.java => OracleFactory.java} | 8 +- .../postgres/dialect/PostgresDialect.java | 4 +- ...onverter.java => PostgresDialectConverter.java} | 6 +- ...resDialectFactory.java => PostgresFactory.java} | 8 +- .../sqlserver/dialect/SqlServerDialect.java | 8 +- ...nverter.java => SqlServerDialectConverter.java} | 6 +- ...erDialectFactory.java => SqlServerFactory.java} | 8 +- .../jdbc/databases/trino/dialect/TrinoDialect.java | 8 +- ...owConverter.java => TrinoDialectConverter.java} | 6 +- ...{TrinoDialectFactory.java => TrinoFactory.java} | 8 +- .../dialect/AbstractPostgresCompatibleDialect.java | 1 + .../TableInsertOrUpdateStatementExecutor.java | 14 ++-- .../executor/TableSimpleStatementExecutor.java | 7 +- .../options/InternalJdbcConnectionOptions.java | 8 +- .../jdbc/internal/options/JdbcDmlOptions.java | 2 +- .../jdbc/table/JdbcDynamicTableFactory.java | 12 +-- .../jdbc/table/JdbcDynamicTableSource.java | 2 +- .../jdbc/table/JdbcOutputFormatBuilder.java | 6 +- .../jdbc/table/JdbcRowDataInputFormat.java | 10 +-- .../jdbc/table/JdbcRowDataLookupFunction.java | 12 +-- .../jdbc/table/RowDataResultExtractor.java | 10 +-- ...he.flink.connector.jdbc.core.table.JdbcFactory} | 18 ++--- ...Test.java => AbstractDialectConverterTest.java} | 10 ++- .../db2/dialect/Db2PreparedStatementTest.java | 7 +- .../dialect/OraclePreparedStatementTest.java | 6 +- .../dialect/SqlServerPreparedStatementTest.java | 6 +- .../trino/dialect/TrinoPreparedStatementTest.java | 7 +- .../FieldNamedPreparedStatementImplTest.java | 7 +- .../jdbc/table/JdbcAppendOnlyWriterTest.java | 4 +- ...FilterPushdownPreparedStatementVisitorTest.java | 6 +- .../jdbc/table/JdbcRowDataInputFormatTest.java | 2 +- flink-connector-jdbc/pom.xml | 20 +++++ .../jdbc/converter/AbstractJdbcRowConverter.java | 26 +++---- .../connector/jdbc/converter/JdbcRowConverter.java | 30 ++++---- .../connector/jdbc/dialect/AbstractDialect.java | 38 ++++++++++ .../flink/connector/jdbc/dialect/JdbcDialect.java | 31 ++++---- .../connector/jdbc/dialect/JdbcDialectFactory.java | 53 ++++++++------ 63 files changed, 403 insertions(+), 292 deletions(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java index a1dbf5af..af94b9cb 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java @@ -18,14 +18,14 @@ package org.apache.flink.connector.jdbc.catalog; +import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.databases.cratedb.catalog.CrateDBCatalog; import org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialect; import org.apache.flink.connector.jdbc.databases.mysql.catalog.MySqlCatalog; import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect; import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog; import org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; import java.util.Properties; @@ -71,7 +71,9 @@ public class JdbcCatalogUtils { String baseUrl, String compatibleMode, Properties connectionProperties) { - JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, compatibleMode, userClassLoader); + + JdbcDialect dialect = + JdbcFactoryLoader.loadDialect(baseUrl, userClassLoader, compatibleMode); if (dialect instanceof PostgresDialect) { return new PostgresCatalog( diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleDialectConverter.java similarity index 92% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleDialectConverter.java index 86c6f16c..85417814 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleDialectConverter.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.jdbc.converter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; @@ -31,12 +32,12 @@ import java.lang.reflect.Array; * Runtime converter that responsible to convert between JDBC object and Flink internal object for * PostgreSQL compatible databases. */ -public abstract class AbstractPostgresCompatibleRowConverter<T extends java.sql.Array> - extends AbstractJdbcRowConverter { +public abstract class AbstractPostgresCompatibleDialectConverter<T extends java.sql.Array> + extends AbstractDialectConverter { private static final long serialVersionUID = 1L; - protected AbstractPostgresCompatibleRowConverter(RowType rowType) { + protected AbstractPostgresCompatibleDialectConverter(RowType rowType) { super(rowType); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactory.java similarity index 73% copy from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java copy to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactory.java index 84f4bd78..448ecaa5 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactory.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.dialect; +package org.apache.flink.connector.jdbc.core.table; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.apache.flink.util.StringUtils; /** @@ -26,13 +27,13 @@ import org.apache.flink.util.StringUtils; * Provider Interfaces (SPI) for discovering. * * <p>Classes that implement this interface can be added to the - * "META_INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory" file of a JAR file - * in the current classpath to be found. + * "META_INF/services/org.apache.flink.connector.jdbc.core.table.JdbcFactory" file of a JAR file in + * the current classpath to be found. * * @see JdbcDialect */ @PublicEvolving -public interface JdbcDialectFactory { +public interface JdbcFactory { /** * Retrieves whether the dialect thinks that it can open a connection to the given URL. @@ -45,8 +46,8 @@ public interface JdbcDialectFactory { */ boolean acceptsURL(String url); - /** @return Creates a new instance of the {@link JdbcDialect}. */ - JdbcDialect create(); + /** @return a new instance of the {@link JdbcDialect}. */ + JdbcDialect createDialect(); /** * Creates a new instance of the {@link JdbcDialect} based on compatible mode. @@ -54,11 +55,19 @@ public interface JdbcDialectFactory { * @param compatibleMode the compatible mode of database * @return a new instance of {@link JdbcDialect} */ - default JdbcDialect create(String compatibleMode) { + default JdbcDialect createDialect(String compatibleMode) { if (StringUtils.isNullOrWhitespaceOnly(compatibleMode)) { - return create(); + return createDialect(); } throw new UnsupportedOperationException( "Not supported option 'compatible-mode' with value: " + compatibleMode); } + + // JdbcCatalog createCatalog( + // ClassLoader classLoader, + // String catalogName, + // String defaultDatabase, + // String username, + // String pwd, + // String baseUrl); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactoryLoader.java similarity index 52% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactoryLoader.java index 4e9b9ba0..3549b76d 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcFactoryLoader.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.dialect; +package org.apache.flink.connector.jdbc.core.table; import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,47 +32,84 @@ import java.util.stream.Collectors; /** Utility for working with {@link JdbcDialect}. */ @Internal -public final class JdbcDialectLoader { +public final class JdbcFactoryLoader { - private static final Logger LOG = LoggerFactory.getLogger(JdbcDialectLoader.class); + private static final Logger LOG = LoggerFactory.getLogger(JdbcFactoryLoader.class); - private JdbcDialectLoader() {} + private JdbcFactoryLoader() {} - public static JdbcDialect load(String url, ClassLoader classLoader) { - return load(url, null, classLoader); + public static JdbcDialect loadDialect(String url, ClassLoader classLoader) { + return loadDialect(url, classLoader, null); } /** * Loads the unique JDBC Dialect that can handle the given database url. * * @param url A database URL. - * @param compatibleMode the compatible mode of database - * @param classLoader the classloader used to load the factory + * @param classLoader the classloader used to load the factory. + * @param compatibleMode the compatible mode of database. * @throws IllegalStateException if the loader cannot find exactly one dialect that can * unambiguously process the given database URL. * @return The loaded dialect. */ - public static JdbcDialect load(String url, String compatibleMode, ClassLoader classLoader) { - List<JdbcDialectFactory> foundFactories = discoverFactories(classLoader); + public static JdbcDialect loadDialect( + String url, ClassLoader classLoader, String compatibleMode) { + return load(url, classLoader).createDialect(compatibleMode); + } + + // public static JdbcCatalog loadCatalog( + // ClassLoader classLoader, + // String catalogName, + // String defaultDatabase, + // String username, + // String pwd, + // String baseUrl) { + // return load(baseUrl, classLoader) + // .createCatalog( + // classLoader, catalogName, defaultDatabase, username, pwd, baseUrl, + // null); + // } + // + // public static JdbcCatalog loadCatalog( + // ClassLoader classLoader, + // String catalogName, + // String defaultDatabase, + // String username, + // String pwd, + // String baseUrl, + // String compatibleMode) { + // return load(baseUrl, classLoader) + // .createCatalog( + // classLoader, + // catalogName, + // defaultDatabase, + // username, + // pwd, + // baseUrl, + // compatibleMode); + // } + + private static JdbcFactory load(String url, ClassLoader classLoader) { + List<JdbcFactory> foundFactories = discoverFactories(classLoader); if (foundFactories.isEmpty()) { throw new IllegalStateException( String.format( - "Could not find any jdbc dialect factories that implement '%s' in the classpath.", - JdbcDialectFactory.class.getName())); + "Could not find any jdbc factories that implement '%s' in the classpath.", + JdbcFactory.class.getName())); } - final List<JdbcDialectFactory> matchingFactories = + final List<JdbcFactory> matchingFactories = foundFactories.stream().filter(f -> f.acceptsURL(url)).collect(Collectors.toList()); if (matchingFactories.isEmpty()) { throw new IllegalStateException( String.format( - "Could not find any jdbc dialect factory that can handle url '%s' that implements '%s' in the classpath.\n\n" + "Could not find any jdbc factory that can handle url '%s' that implements '%s' in the classpath.\n\n" + "Available factories are:\n\n" + "%s", url, - JdbcDialectFactory.class.getName(), + JdbcFactory.class.getName(), foundFactories.stream() .map(f -> f.getClass().getName()) .distinct() @@ -81,31 +119,30 @@ public final class JdbcDialectLoader { if (matchingFactories.size() > 1) { throw new IllegalStateException( String.format( - "Multiple jdbc dialect factories can handle url '%s' that implement '%s' found in the classpath.\n\n" + "Multiple jdbc factories can handle url '%s' that implement '%s' found in the classpath.\n\n" + "Ambiguous factory classes are:\n\n" + "%s", url, - JdbcDialectFactory.class.getName(), + JdbcFactory.class.getName(), matchingFactories.stream() .map(f -> f.getClass().getName()) .sorted() .collect(Collectors.joining("\n")))); } - return matchingFactories.get(0).create(compatibleMode); + return matchingFactories.get(0); } - private static List<JdbcDialectFactory> discoverFactories(ClassLoader classLoader) { + private static List<JdbcFactory> discoverFactories(ClassLoader classLoader) { try { - final List<JdbcDialectFactory> result = new LinkedList<>(); - ServiceLoader.load(JdbcDialectFactory.class, classLoader) + final List<JdbcFactory> result = new LinkedList<>(); + ServiceLoader.load(JdbcFactory.class, classLoader) .iterator() .forEachRemaining(result::add); return result; } catch (ServiceConfigurationError e) { - LOG.error("Could not load service provider for jdbc dialects factory.", e); - throw new RuntimeException( - "Could not load service provider for jdbc dialects factory.", e); + LOG.error("Could not load service provider for jdbc factory.", e); + throw new RuntimeException("Could not load service provider for jdbc factory.", e); } } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialect.java similarity index 99% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialect.java index d838ebe6..29b58427 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialect.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.dialect; +package org.apache.flink.connector.jdbc.core.table.dialect; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.api.ValidationException; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialectConverter.java similarity index 98% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialectConverter.java index 1a19d127..95d9bcdd 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/AbstractDialectConverter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.converter; +package org.apache.flink.connector.jdbc.core.table.dialect; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; @@ -47,7 +47,7 @@ import java.time.LocalTime; import static org.apache.flink.util.Preconditions.checkNotNull; /** Base class for all converters that convert between JDBC object and Flink internal object. */ -public abstract class AbstractJdbcRowConverter implements JdbcRowConverter { +public abstract class AbstractDialectConverter implements JdbcDialectConverter { protected final RowType rowType; protected final JdbcDeserializationConverter[] toInternalConverters; @@ -56,7 +56,7 @@ public abstract class AbstractJdbcRowConverter implements JdbcRowConverter { public abstract String converterName(); - public AbstractJdbcRowConverter(RowType rowType) { + public AbstractDialectConverter(RowType rowType) { this.rowType = checkNotNull(rowType); this.fieldTypes = rowType.getFields().stream() diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialect.java similarity index 96% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialect.java index 6cc6bbd5..b748d308 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialect.java @@ -16,10 +16,10 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.dialect; +package org.apache.flink.connector.jdbc.core.table.dialect; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.RowType; @@ -30,7 +30,7 @@ import java.util.Optional; * Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable * and stateless. * - * @see JdbcDialectFactory + * @see JdbcFactory */ @PublicEvolving public interface JdbcDialect extends Serializable { @@ -48,7 +48,7 @@ public interface JdbcDialect extends Serializable { * @param rowType the given row type * @return a row converter for the database */ - JdbcRowConverter getRowConverter(RowType rowType); + JdbcDialectConverter getRowConverter(RowType rowType); /** * Get limit clause to limit the number of emitted row from the jdbc source. diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialectConverter.java similarity index 93% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialectConverter.java index c1f1b1f5..77bb961e 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/dialect/JdbcDialectConverter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.converter; +package org.apache.flink.connector.jdbc.core.table.dialect; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; @@ -31,7 +31,7 @@ import java.sql.SQLException; * structure {@link RowData}. */ @PublicEvolving -public interface JdbcRowConverter extends Serializable { +public interface JdbcDialectConverter extends Serializable { /** * Convert data retrieved from {@link ResultSet} to internal {@link RowData}. diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java index 7592cf2e..c290917b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java @@ -34,8 +34,8 @@ public class CrateDBDialect extends AbstractPostgresCompatibleDialect { } @Override - public CrateDBRowConverter getRowConverter(RowType rowType) { - return new CrateDBRowConverter(rowType); + public CrateDBDialectConverter getRowConverter(RowType rowType) { + return new CrateDBDialectConverter(rowType); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectConverter.java similarity index 87% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectConverter.java index 1412b0d5..80f9db25 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectConverter.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.databases.cratedb.dialect; -import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter; +import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleDialectConverter; import org.apache.flink.table.types.logical.RowType; import io.crate.shade.org.postgresql.jdbc.PgArray; @@ -27,11 +27,11 @@ import io.crate.shade.org.postgresql.jdbc.PgArray; * Runtime converter that responsible to convert between JDBC object and Flink internal object for * CrateDB. */ -public class CrateDBRowConverter extends AbstractPostgresCompatibleRowConverter<PgArray> { +public class CrateDBDialectConverter extends AbstractPostgresCompatibleDialectConverter<PgArray> { private static final long serialVersionUID = 1L; - public CrateDBRowConverter(RowType rowType) { + public CrateDBDialectConverter(RowType rowType) { super(rowType); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBFactory.java similarity index 83% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBFactory.java index ac5a8e7d..fcf31b24 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBFactory.java @@ -19,19 +19,19 @@ package org.apache.flink.connector.jdbc.databases.cratedb.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; /** Factory for {@link CrateDBDialect}. */ @Internal -public class CrateDBDialectFactory implements JdbcDialectFactory { +public class CrateDBFactory implements JdbcFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:crate:"); } @Override - public JdbcDialect create() { + public JdbcDialect createDialect() { return new CrateDBDialect(); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java index e617a9fc..49f71c5b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Dialect.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.jdbc.databases.db2.dialect; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; -import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -42,8 +42,8 @@ public class Db2Dialect extends AbstractDialect { private static final int MIN_DECIMAL_PRECISION = 1; @Override - public JdbcRowConverter getRowConverter(RowType rowType) { - return new Db2RowConverter(rowType); + public JdbcDialectConverter getRowConverter(RowType rowType) { + return new Db2DialectConverter(rowType); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2RowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectConverter.java similarity index 87% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2RowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectConverter.java index 8557c822..4ad3fc49 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2RowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectConverter.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.databases.db2.dialect; -import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -26,7 +26,7 @@ import org.apache.flink.table.types.logical.RowType; * Runtime converter that responsible to convert between JDBC object and Flink internal object for * Db2. */ -public class Db2RowConverter extends AbstractJdbcRowConverter { +public class Db2DialectConverter extends AbstractDialectConverter { private static final long serialVersionUID = 1L; @@ -35,7 +35,7 @@ public class Db2RowConverter extends AbstractJdbcRowConverter { return "Db2"; } - public Db2RowConverter(RowType rowType) { + public Db2DialectConverter(RowType rowType) { super(rowType); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Factory.java similarity index 83% copy from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java copy to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Factory.java index c3a21829..268a20c2 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2Factory.java @@ -19,19 +19,19 @@ package org.apache.flink.connector.jdbc.databases.db2.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; /** Factory for {@link Db2Dialect}. */ @Internal -public class Db2DialectFactory implements JdbcDialectFactory { +public class Db2Factory implements JdbcFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:db2:"); } @Override - public JdbcDialect create() { + public JdbcDialect createDialect() { return new Db2Dialect(); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java index dad028f0..93c6c325 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialect.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.jdbc.databases.derby.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; -import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -45,8 +45,8 @@ public class DerbyDialect extends AbstractDialect { private static final int MIN_DECIMAL_PRECISION = 1; @Override - public JdbcRowConverter getRowConverter(RowType rowType) { - return new DerbyRowConverter(rowType); + public JdbcDialectConverter getRowConverter(RowType rowType) { + return new DerbyDialectConverter(rowType); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectConverter.java similarity index 86% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectConverter.java index 7f05a303..846f2c16 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectConverter.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.databases.derby.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; import org.apache.flink.table.types.logical.RowType; /** @@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.RowType; * Derby. */ @Internal -public class DerbyRowConverter extends AbstractJdbcRowConverter { +public class DerbyDialectConverter extends AbstractDialectConverter { private static final long serialVersionUID = 1L; @@ -36,7 +36,7 @@ public class DerbyRowConverter extends AbstractJdbcRowConverter { return "Derby"; } - public DerbyRowConverter(RowType rowType) { + public DerbyDialectConverter(RowType rowType) { super(rowType); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyFactory.java similarity index 83% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectFactory.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyFactory.java index 280038dd..639d8695 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyDialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/derby/dialect/DerbyFactory.java @@ -19,19 +19,19 @@ package org.apache.flink.connector.jdbc.databases.derby.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; /** Factory for {@link DerbyDialect}. */ @Internal -public class DerbyDialectFactory implements JdbcDialectFactory { +public class DerbyFactory implements JdbcFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:derby:"); } @Override - public JdbcDialect create() { + public JdbcDialect createDialect() { return new DerbyDialect(); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlTypeMapper.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlTypeMapper.java index 995e0912..235136dd 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlTypeMapper.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlTypeMapper.java @@ -120,7 +120,8 @@ public class MySqlTypeMapper implements JdbcDialectTypeMapper { // VARBINARY(n) is not supported in MySqlDialect when 'n' is not equals to // Integer.MAX_VALUE. Please see // org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect#supportedTypes and - // org.apache.flink.connector.jdbc.dialect.AbstractDialect#validate for more + // org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect#validate for + // more // details. return DataTypes.BYTES(); case MYSQL_TINYINT: diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLDialectConverter.java similarity index 86% copy from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java copy to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLDialectConverter.java index ef5d0fd7..c9c883f8 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLDialectConverter.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.databases.mysql.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; import org.apache.flink.table.types.logical.RowType; /** @@ -27,7 +27,7 @@ import org.apache.flink.table.types.logical.RowType; * MySQL. */ @Internal -public class MySQLRowConverter extends AbstractJdbcRowConverter { +public class MySQLDialectConverter extends AbstractDialectConverter { private static final long serialVersionUID = 1L; @@ -36,7 +36,7 @@ public class MySQLRowConverter extends AbstractJdbcRowConverter { return "MySQL"; } - public MySQLRowConverter(RowType rowType) { + public MySQLDialectConverter(RowType rowType) { super(rowType); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java index 6d79d300..ba25e0eb 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialect.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.jdbc.databases.mysql.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; -import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -51,8 +51,8 @@ public class MySqlDialect extends AbstractDialect { private static final String REWRITE_BATCHED_STATEMENTS = "rewriteBatchedStatements"; @Override - public JdbcRowConverter getRowConverter(RowType rowType) { - return new MySQLRowConverter(rowType); + public JdbcDialectConverter getRowConverter(RowType rowType) { + return new MySQLDialectConverter(rowType); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlFactory.java similarity index 83% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialectFactory.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlFactory.java index 71ae04cc..a3544a01 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlDialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySqlFactory.java @@ -19,19 +19,19 @@ package org.apache.flink.connector.jdbc.databases.mysql.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; /** Factory for {@link MySqlDialect}. */ @Internal -public class MySqlDialectFactory implements JdbcDialectFactory { +public class MySqlFactory implements JdbcFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:mysql:"); } @Override - public JdbcDialect create() { + public JdbcDialect createDialect() { return new MySqlDialect(); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java index 885d3392..d190ea6a 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java @@ -19,10 +19,10 @@ package org.apache.flink.connector.jdbc.databases.oceanbase.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect; import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect; -import org.apache.flink.connector.jdbc.dialect.AbstractDialect; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -84,8 +84,8 @@ public class OceanBaseDialect extends AbstractDialect { } @Override - public JdbcRowConverter getRowConverter(RowType rowType) { - return new OceanBaseRowConverter(rowType); + public JdbcDialectConverter getRowConverter(RowType rowType) { + return new OceanBaseDialectConverter(rowType); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectConverter.java similarity index 96% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectConverter.java index 95981217..1bc66254 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectConverter.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.databases.oceanbase.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; @@ -40,7 +40,7 @@ import java.time.LocalDateTime; * OceanBase. */ @Internal -public class OceanBaseRowConverter extends AbstractJdbcRowConverter { +public class OceanBaseDialectConverter extends AbstractDialectConverter { private static final long serialVersionUID = 1L; @@ -49,7 +49,7 @@ public class OceanBaseRowConverter extends AbstractJdbcRowConverter { return "OceanBase"; } - public OceanBaseRowConverter(RowType rowType) { + public OceanBaseDialectConverter(RowType rowType) { super(rowType); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseFactory.java similarity index 81% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectFactory.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseFactory.java index 46152379..1455e310 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseFactory.java @@ -19,14 +19,14 @@ package org.apache.flink.connector.jdbc.databases.oceanbase.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import javax.annotation.Nonnull; /** Factory for {@link OceanBaseDialect}. */ @Internal -public class OceanBaseDialectFactory implements JdbcDialectFactory { +public class OceanBaseFactory implements JdbcFactory { @Override public boolean acceptsURL(String url) { @@ -34,13 +34,13 @@ public class OceanBaseDialectFactory implements JdbcDialectFactory { } @Override - public JdbcDialect create() { + public JdbcDialect createDialect() { throw new UnsupportedOperationException( "Can't create JdbcDialect without compatible mode for OceanBase"); } @Override - public JdbcDialect create(@Nonnull String compatibleMode) { + public JdbcDialect createDialect(@Nonnull String compatibleMode) { return new OceanBaseDialect(compatibleMode); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java index 1ed061fe..d15a4a5b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialect.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.jdbc.databases.oracle.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; -import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -47,8 +47,8 @@ public class OracleDialect extends AbstractDialect { private static final int MIN_DECIMAL_PRECISION = 1; @Override - public JdbcRowConverter getRowConverter(RowType rowType) { - return new OracleRowConverter(rowType); + public JdbcDialectConverter getRowConverter(RowType rowType) { + return new OracleDialectConverter(rowType); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectConverter.java similarity index 97% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectConverter.java index b476e661..4acc8f97 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectConverter.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.databases.oracle.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; @@ -50,11 +50,11 @@ import java.time.ZonedDateTime; * Oracle. */ @Internal -public class OracleRowConverter extends AbstractJdbcRowConverter { +public class OracleDialectConverter extends AbstractDialectConverter { private static final long serialVersionUID = 1L; - public OracleRowConverter(RowType rowType) { + public OracleDialectConverter(RowType rowType) { super(rowType); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleFactory.java similarity index 83% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectFactory.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleFactory.java index b7bc60ea..78f124b1 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleDialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OracleFactory.java @@ -19,19 +19,19 @@ package org.apache.flink.connector.jdbc.databases.oracle.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; /** Factory for {@link OracleDialect}. */ @Internal -public class OracleDialectFactory implements JdbcDialectFactory { +public class OracleFactory implements JdbcFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:oracle:"); } @Override - public JdbcDialect create() { + public JdbcDialect createDialect() { return new OracleDialect(); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java index d0924aee..6973e597 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java @@ -31,8 +31,8 @@ public class PostgresDialect extends AbstractPostgresCompatibleDialect { private static final long serialVersionUID = 1L; @Override - public PostgresRowConverter getRowConverter(RowType rowType) { - return new PostgresRowConverter(rowType); + public PostgresDialectConverter getRowConverter(RowType rowType) { + return new PostgresDialectConverter(rowType); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectConverter.java similarity index 87% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectConverter.java index 2e6e14c2..7ac579a0 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectConverter.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.databases.postgres.dialect; -import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter; +import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleDialectConverter; import org.apache.flink.table.types.logical.RowType; import org.postgresql.jdbc.PgArray; @@ -27,11 +27,11 @@ import org.postgresql.jdbc.PgArray; * Runtime converter that responsible to convert between JDBC object and Flink internal object for * PostgreSQL. */ -public class PostgresRowConverter extends AbstractPostgresCompatibleRowConverter<PgArray> { +public class PostgresDialectConverter extends AbstractPostgresCompatibleDialectConverter<PgArray> { private static final long serialVersionUID = 1L; - public PostgresRowConverter(RowType rowType) { + public PostgresDialectConverter(RowType rowType) { super(rowType); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresFactory.java similarity index 83% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectFactory.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresFactory.java index 6286f144..5fe6df8f 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresFactory.java @@ -19,19 +19,19 @@ package org.apache.flink.connector.jdbc.databases.postgres.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; /** Factory for {@link PostgresDialect}. */ @Internal -public class PostgresDialectFactory implements JdbcDialectFactory { +public class PostgresFactory implements JdbcFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:postgresql:"); } @Override - public JdbcDialect create() { + public JdbcDialect createDialect() { return new PostgresDialect(); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java index 91469a6b..f19e6f89 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialect.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.jdbc.databases.sqlserver.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; -import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -126,8 +126,8 @@ public class SqlServerDialect extends AbstractDialect { } @Override - public JdbcRowConverter getRowConverter(RowType rowType) { - return new SqlServerRowConverter(rowType); + public JdbcDialectConverter getRowConverter(RowType rowType) { + return new SqlServerDialectConverter(rowType); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectConverter.java similarity index 88% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectConverter.java index 5f3ac2b4..9c7451b0 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectConverter.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.databases.sqlserver.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -28,7 +28,7 @@ import org.apache.flink.table.types.logical.RowType; * MsSql. */ @Internal -public class SqlServerRowConverter extends AbstractJdbcRowConverter { +public class SqlServerDialectConverter extends AbstractDialectConverter { private static final long serialVersionUID = 1L; @@ -37,7 +37,7 @@ public class SqlServerRowConverter extends AbstractJdbcRowConverter { return "SqlServer"; } - public SqlServerRowConverter(RowType rowType) { + public SqlServerDialectConverter(RowType rowType) { super(rowType); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerFactory.java similarity index 83% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectFactory.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerFactory.java index 9b6ee18c..695cca72 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerDialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerFactory.java @@ -19,19 +19,19 @@ package org.apache.flink.connector.jdbc.databases.sqlserver.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; /** Factory for {@link SqlServerDialect}. */ @Internal -public class SqlServerDialectFactory implements JdbcDialectFactory { +public class SqlServerFactory implements JdbcFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:sqlserver:"); } @Override - public JdbcDialect create() { + public JdbcDialect createDialect() { return new SqlServerDialect(); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java index 041d24c3..5c02dda0 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialect.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.jdbc.databases.trino.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; -import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; @@ -45,8 +45,8 @@ public class TrinoDialect extends AbstractDialect { private static final int MIN_DECIMAL_PRECISION = 1; @Override - public JdbcRowConverter getRowConverter(RowType rowType) { - return new TrinoRowConverter(rowType); + public JdbcDialectConverter getRowConverter(RowType rowType) { + return new TrinoDialectConverter(rowType); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoRowConverter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectConverter.java similarity index 91% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoRowConverter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectConverter.java index 92a839e9..a37d6e90 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoRowConverter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectConverter.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.databases.trino.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; @@ -32,11 +32,11 @@ import java.math.BigDecimal; * Trino. */ @Internal -public class TrinoRowConverter extends AbstractJdbcRowConverter { +public class TrinoDialectConverter extends AbstractDialectConverter { private static final long serialVersionUID = 1L; - public TrinoRowConverter(RowType rowType) { + public TrinoDialectConverter(RowType rowType) { super(rowType); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoFactory.java similarity index 83% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectFactory.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoFactory.java index a0e2edda..9283b8b4 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoDialectFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoFactory.java @@ -19,19 +19,19 @@ package org.apache.flink.connector.jdbc.databases.trino.dialect; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; /** Factory for {@link TrinoDialect}. */ @Internal -public class TrinoDialectFactory implements JdbcDialectFactory { +public class TrinoFactory implements JdbcFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:trino:"); } @Override - public JdbcDialect create() { + public JdbcDialect createDialect() { return new TrinoDialect(); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java index 0ca425f3..2845c75b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.jdbc.dialect; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect; import org.apache.flink.table.types.logical.LogicalTypeRoot; import java.util.Arrays; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java index 0842177f..17a25c6f 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.internal.executor; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; import org.apache.flink.connector.jdbc.statement.StatementFactory; import org.apache.flink.table.data.RowData; @@ -43,9 +43,9 @@ public final class TableInsertOrUpdateStatementExecutor private final StatementFactory insertStmtFactory; private final StatementFactory updateStmtFactory; - private final JdbcRowConverter existSetter; - private final JdbcRowConverter insertSetter; - private final JdbcRowConverter updateSetter; + private final JdbcDialectConverter existSetter; + private final JdbcDialectConverter insertSetter; + private final JdbcDialectConverter updateSetter; private final Function<RowData, RowData> keyExtractor; @@ -57,9 +57,9 @@ public final class TableInsertOrUpdateStatementExecutor StatementFactory existStmtFactory, StatementFactory insertStmtFactory, StatementFactory updateStmtFactory, - JdbcRowConverter existSetter, - JdbcRowConverter insertSetter, - JdbcRowConverter updateSetter, + JdbcDialectConverter existSetter, + JdbcDialectConverter insertSetter, + JdbcDialectConverter updateSetter, Function<RowData, RowData> keyExtractor) { this.existStmtFactory = checkNotNull(existStmtFactory); this.insertStmtFactory = checkNotNull(insertStmtFactory); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java index d7d74ba2..59866b51 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.internal.executor; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; import org.apache.flink.connector.jdbc.statement.StatementFactory; import org.apache.flink.table.data.RowData; @@ -35,7 +35,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public final class TableSimpleStatementExecutor implements JdbcBatchStatementExecutor<RowData> { private final StatementFactory stmtFactory; - private final JdbcRowConverter converter; + private final JdbcDialectConverter converter; private transient FieldNamedPreparedStatement st; @@ -43,7 +43,8 @@ public final class TableSimpleStatementExecutor implements JdbcBatchStatementExe * Keep in mind object reuse: if it's on then key extractor may be required to return new * object. */ - public TableSimpleStatementExecutor(StatementFactory stmtFactory, JdbcRowConverter converter) { + public TableSimpleStatementExecutor( + StatementFactory stmtFactory, JdbcDialectConverter converter) { this.stmtFactory = checkNotNull(stmtFactory); this.converter = checkNotNull(converter); } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java index 2083b437..753e5cab 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java @@ -20,8 +20,8 @@ package org.apache.flink.connector.jdbc.internal.options; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -179,7 +179,7 @@ public class InternalJdbcConnectionOptions extends JdbcConnectionOptions { /** * optional, Handle the SQL dialect of jdbc driver. If not set, it will be infer by {@link - * JdbcDialectLoader#load} from DB url. + * JdbcFactoryLoader#load} from DB url. */ public Builder setDialect(JdbcDialect dialect) { this.dialect = dialect; @@ -205,7 +205,7 @@ public class InternalJdbcConnectionOptions extends JdbcConnectionOptions { if (classLoader == null) { classLoader = Thread.currentThread().getContextClassLoader(); } - this.dialect = JdbcDialectLoader.load(dbURL, compatibleMode, classLoader); + this.dialect = JdbcFactoryLoader.loadDialect(dbURL, classLoader, compatibleMode); } if (this.driverName == null) { Optional<String> optional = dialect.defaultDriverName(); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java index a889a679..776fe431 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions.java @@ -17,7 +17,7 @@ package org.apache.flink.connector.jdbc.internal.options; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java index 58363a52..17ba340d 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; @@ -131,7 +131,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam private static void validateDataTypeWithJdbcDialect( DataType dataType, String url, String compatibleMode, ClassLoader classLoader) { - final JdbcDialect dialect = JdbcDialectLoader.load(url, compatibleMode, classLoader); + final JdbcDialect dialect = JdbcFactoryLoader.loadDialect(url, classLoader, compatibleMode); dialect.validate((RowType) dataType.getLogicalType()); } @@ -144,8 +144,8 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam .setDBUrl(url) .setTableName(readableConfig.get(TABLE_NAME)) .setDialect( - JdbcDialectLoader.load( - url, readableConfig.get(COMPATIBLE_MODE), classLoader)) + JdbcFactoryLoader.loadDialect( + url, classLoader, readableConfig.get(COMPATIBLE_MODE))) .setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null)) .setConnectionCheckTimeoutSeconds( (int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds()); @@ -283,7 +283,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam private void validateConfigOptions(ReadableConfig config, ClassLoader classLoader) { String jdbcUrl = config.get(URL); - JdbcDialectLoader.load(jdbcUrl, config.get(COMPATIBLE_MODE), classLoader); + JdbcFactoryLoader.loadDialect(jdbcUrl, classLoader, config.get(COMPATIBLE_MODE)); checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD}); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java index c8ef2e33..bc2c1732 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java index 745d2c02..554aaaf2 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatBuilder.java @@ -19,9 +19,9 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor; @@ -174,7 +174,7 @@ public class JdbcOutputFormatBuilder implements Serializable { private static JdbcBatchStatementExecutor<RowData> createSimpleRowExecutor( JdbcDialect dialect, String[] fieldNames, LogicalType[] fieldTypes, final String sql) { - final JdbcRowConverter rowConverter = dialect.getRowConverter(RowType.of(fieldTypes)); + final JdbcDialectConverter rowConverter = dialect.getRowConverter(RowType.of(fieldTypes)); return new TableSimpleStatementExecutor( connection -> FieldNamedPreparedStatement.prepareStatement(connection, sql, fieldNames), diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java index 7e5dc2a3..a4990a73 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java @@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; @@ -67,7 +67,7 @@ public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit> private String queryTemplate; private int resultSetType; private int resultSetConcurrency; - private JdbcRowConverter rowConverter; + private JdbcDialectConverter rowConverter; private TypeInformation<RowData> rowDataTypeInfo; private transient PreparedStatement statement; @@ -82,7 +82,7 @@ public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit> String queryTemplate, int resultSetType, int resultSetConcurrency, - JdbcRowConverter rowConverter, + JdbcDialectConverter rowConverter, TypeInformation<RowData> rowDataTypeInfo) { this.connectionProvider = connectionProvider; this.fetchSize = fetchSize; @@ -303,7 +303,7 @@ public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit> private Boolean autoCommit; private Object[][] parameterValues; private String queryTemplate; - private JdbcRowConverter rowConverter; + private JdbcDialectConverter rowConverter; private TypeInformation<RowData> rowDataTypeInfo; private int resultSetType = ResultSet.TYPE_FORWARD_ONLY; private int resultSetConcurrency = ResultSet.CONCUR_READ_ONLY; @@ -347,7 +347,7 @@ public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit> return this; } - public Builder setRowConverter(JdbcRowConverter rowConverter) { + public Builder setRowConverter(JdbcDialectConverter rowConverter) { this.rowConverter = rowConverter; return this; } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java index 23ddf3c7..d3e95b31 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java @@ -20,10 +20,10 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; import org.apache.flink.table.data.RowData; @@ -62,8 +62,8 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private final JdbcConnectionProvider connectionProvider; private final String[] keyNames; private final int maxRetryTimes; - private final JdbcRowConverter jdbcRowConverter; - private final JdbcRowConverter lookupKeyRowConverter; + private final JdbcDialectConverter jdbcDialectConverter; + private final JdbcDialectConverter lookupKeyRowConverter; private final List<String> resolvedPredicates; private final Serializable[] pushdownParams; @@ -105,7 +105,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction { options.getDialect() .getSelectFromStatement(options.getTableName(), fieldNames, keyNames); JdbcDialect jdbcDialect = options.getDialect(); - this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType); + this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType); this.lookupKeyRowConverter = jdbcDialect.getRowConverter( RowType.of( @@ -151,7 +151,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction { try (ResultSet resultSet = statement.executeQuery()) { ArrayList<RowData> rows = new ArrayList<>(); while (resultSet.next()) { - RowData row = jdbcRowConverter.toInternal(resultSet); + RowData row = jdbcDialectConverter.toInternal(resultSet); rows.add(row); } rows.trimToSize(); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java index 6a191f5a..3fc1e422 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.table; -import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; @@ -29,14 +29,14 @@ import java.sql.SQLException; /** The result extractor for {@link RowData}. */ public class RowDataResultExtractor implements ResultExtractor<RowData> { - private final JdbcRowConverter jdbcRowConverter; + private final JdbcDialectConverter jdbcDialectConverter; - public RowDataResultExtractor(JdbcRowConverter jdbcRowConverter) { - this.jdbcRowConverter = Preconditions.checkNotNull(jdbcRowConverter); + public RowDataResultExtractor(JdbcDialectConverter jdbcDialectConverter) { + this.jdbcDialectConverter = Preconditions.checkNotNull(jdbcDialectConverter); } @Override public RowData extract(ResultSet resultSet) throws SQLException { - return jdbcRowConverter.toInternal(resultSet); + return jdbcDialectConverter.toInternal(resultSet); } } diff --git a/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory b/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.table.JdbcFactory similarity index 69% rename from flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory rename to flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.table.JdbcFactory index 9a3cf26c..13eecd1a 100644 --- a/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory +++ b/flink-connector-jdbc-core/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.table.JdbcFactory @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.connector.jdbc.databases.derby.dialect.DerbyDialectFactory -org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialectFactory -org.apache.flink.connector.jdbc.databases.oceanbase.dialect.OceanBaseDialectFactory -org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialectFactory -org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialectFactory -org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFactory -org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory -org.apache.flink.connector.jdbc.databases.db2.dialect.Db2DialectFactory -org.apache.flink.connector.jdbc.databases.trino.dialect.TrinoDialectFactory +org.apache.flink.connector.jdbc.databases.derby.dialect.DerbyFactory +org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlFactory +org.apache.flink.connector.jdbc.databases.oceanbase.dialect.OceanBaseFactory +org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresFactory +org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleFactory +org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerFactory +org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBFactory +org.apache.flink.connector.jdbc.databases.db2.dialect.Db2Factory +org.apache.flink.connector.jdbc.databases.trino.dialect.TrinoFactory diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractDialectConverterTest.java similarity index 85% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractDialectConverterTest.java index 9f9fee1e..bfb77392 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractDialectConverterTest.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.jdbc.converter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; @@ -31,14 +33,14 @@ import java.time.LocalDateTime; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link AbstractJdbcRowConverter}. */ -class AbstractJdbcRowConverterTest { +/** Test for {@link AbstractDialectConverter}. */ +class AbstractDialectConverterTest { @Test void testExternalLocalDateTimeToTimestamp() throws Exception { RowType rowType = RowType.of(new IntType(), new TimestampType(3)); - JdbcRowConverter rowConverter = - new AbstractJdbcRowConverter(rowType) { + JdbcDialectConverter rowConverter = + new AbstractDialectConverter(rowType) { private static final long serialVersionUID = 1L; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java index 162c3bc1..8a8ea588 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2PreparedStatementTest.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.jdbc.databases.db2.dialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.junit.jupiter.api.Test; @@ -29,7 +29,8 @@ import static org.assertj.core.api.Assertions.assertThat; class Db2PreparedStatementTest { private final JdbcDialect dialect = - JdbcDialectLoader.load("jdbc:db2://localhost:3306/test", getClass().getClassLoader()); + JdbcFactoryLoader.loadDialect( + "jdbc:db2://localhost:3306/test", getClass().getClassLoader()); private final String[] fieldNames = new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}; private final String[] keyFields = new String[] {"id", "__field_3__"}; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java index f69f6a33..372493b2 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/dialect/OraclePreparedStatementTest.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.jdbc.databases.oracle.dialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl; import org.junit.jupiter.api.Test; @@ -36,7 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat; class OraclePreparedStatementTest { private final JdbcDialect dialect = - JdbcDialectLoader.load( + JdbcFactoryLoader.loadDialect( "jdbc:oracle://localhost:3306/test", getClass().getClassLoader()); private final String[] fieldNames = new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java index 33fc3ed3..fbe57835 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/sqlserver/dialect/SqlServerPreparedStatementTest.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.jdbc.databases.sqlserver.dialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.junit.jupiter.api.Test; @@ -29,7 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat; class SqlServerPreparedStatementTest { private final JdbcDialect dialect = - JdbcDialectLoader.load( + JdbcFactoryLoader.loadDialect( "jdbc:sqlserver://localhost:3306/test", getClass().getClassLoader()); private final String[] fieldNames = new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoPreparedStatementTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoPreparedStatementTest.java index 847f1a7f..8b3d9854 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoPreparedStatementTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/databases/trino/dialect/TrinoPreparedStatementTest.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.jdbc.databases.trino.dialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl; import org.junit.jupiter.api.Test; @@ -37,7 +37,8 @@ import static org.assertj.core.api.Assertions.assertThat; class TrinoPreparedStatementTest { private final JdbcDialect dialect = - JdbcDialectLoader.load("jdbc:trino://localhost:3306/test", getClass().getClassLoader()); + JdbcFactoryLoader.loadDialect( + "jdbc:trino://localhost:3306/test", getClass().getClassLoader()); private final String[] fieldNames = new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}; private final String[] keyFields = new String[] {"id", "__field_3__"}; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java index b1d5684e..ff969b7d 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.jdbc.statement; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.junit.jupiter.api.Test; @@ -35,7 +35,8 @@ import static org.assertj.core.api.Assertions.assertThat; class FieldNamedPreparedStatementImplTest { private final JdbcDialect dialect = - JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", getClass().getClassLoader()); + JdbcFactoryLoader.loadDialect( + "jdbc:mysql://localhost:3306/test", getClass().getClassLoader()); private final String[] fieldNames = new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}; private final String[] keyFields = new String[] {"id", "__field_3__"}; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java index 7209dbfa..cad629f0 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java @@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.jdbc.JdbcTestBase; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.core.table.JdbcFactoryLoader; import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; import org.apache.flink.connector.jdbc.internal.RowJdbcOutputFormat; @@ -62,7 +62,7 @@ class JdbcAppendOnlyWriterTest extends JdbcTestBase { InternalJdbcConnectionOptions.builder() .setDBUrl(getMetadata().getJdbcUrl()) .setDialect( - JdbcDialectLoader.load( + JdbcFactoryLoader.loadDialect( getMetadata() .getJdbcUrl(), getClass() diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java index ec08391d..147a254e 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.connector.jdbc.JdbcTestBase; -import org.apache.flink.connector.jdbc.databases.derby.dialect.DerbyDialectFactory; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.databases.derby.dialect.DerbyFactory; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableException; @@ -221,7 +221,7 @@ class JdbcFilterPushdownPreparedStatementVisitorTest { Serializable[] expectedParams) { List<ResolvedExpression> resolved = resolveSQLFilterToExpression(inputExpr, schema); assertThat(resolved.size()).isEqualTo(1); - JdbcDialect dialect = new DerbyDialectFactory().create(); + JdbcDialect dialect = new DerbyFactory().createDialect(); JdbcFilterPushdownPreparedStatementVisitor visitor = new JdbcFilterPushdownPreparedStatementVisitor(dialect::quoteIdentifier); ParameterizedPredicate pred = resolved.get(0).accept(visitor).get(); diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java index b2e4deea..3f8d19ea 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java @@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.connector.jdbc.JdbcDataTestBase; import org.apache.flink.connector.jdbc.JdbcTestFixture; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml index 68d05f08..5e507f93 100644 --- a/flink-connector-jdbc/pom.xml +++ b/flink-connector-jdbc/pom.xml @@ -42,6 +42,22 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-annotations</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + </dependency> + </dependencies> <build> @@ -60,7 +76,11 @@ under the License. <shadeTestJar>true</shadeTestJar> <artifactSet> <excludes> + <exclude>org.apache.flink:flink-annotations</exclude> + <exclude>org.apache.flink:flink-core</exclude> <exclude>org.apache.flink:flink-connector-base</exclude> + <exclude>org.apache.flink:flink-table-common</exclude> + <exclude>org.apache.flink:flink-connector-jdbc-core:tests</exclude> </excludes> </artifactSet> <filters combine.children="append"> diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java similarity index 61% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java rename to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java index ef5d0fd7..f45f0c09 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/dialect/MySQLRowConverter.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java @@ -16,27 +16,19 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.databases.mysql.dialect; +package org.apache.flink.connector.jdbc.converter; -import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialectConverter; import org.apache.flink.table.types.logical.RowType; /** - * Runtime converter that responsible to convert between JDBC object and Flink internal object for - * MySQL. - */ -@Internal -public class MySQLRowConverter extends AbstractJdbcRowConverter { - - private static final long serialVersionUID = 1L; - - @Override - public String converterName() { - return "MySQL"; - } - - public MySQLRowConverter(RowType rowType) { + * Base class for all converters that convert between JDBC object and Flink internal object. + * + * @deprecated use AbstractDialectConverter + * */ +@Deprecated +public abstract class AbstractJdbcRowConverter extends AbstractDialectConverter { + public AbstractJdbcRowConverter(RowType rowType) { super(rowType); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java similarity index 59% copy from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java copy to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java index c3a21829..923fe222 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/JdbcRowConverter.java @@ -16,22 +16,18 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.databases.db2.dialect; +package org.apache.flink.connector.jdbc.converter; -import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialectConverter; +import org.apache.flink.table.data.RowData; -/** Factory for {@link Db2Dialect}. */ -@Internal -public class Db2DialectFactory implements JdbcDialectFactory { - @Override - public boolean acceptsURL(String url) { - return url.startsWith("jdbc:db2:"); - } - - @Override - public JdbcDialect create() { - return new Db2Dialect(); - } -} +/** + * Converter that is responsible to convert between JDBC object and Flink SQL internal data + * structure {@link RowData}. + * + * @deprecated use JdbcDialectConverter + */ +@Deprecated +@PublicEvolving +public interface JdbcRowConverter extends JdbcDialectConverter {} diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java new file mode 100644 index 00000000..28b2d8a2 --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.dialect; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect; + +/** + * Base class for {@link JdbcDialect JdbcDialects} that implements basic data type validation and + * the construction of basic {@code INSERT}, {@code UPDATE}, {@code DELETE}, and {@code SELECT} + * statements. + * + * <p>Implementors should be careful to check the default SQL statements are performant for their + * specific dialect and override them if necessary. + * + * @deprecated use org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect + */ +@Deprecated +@PublicEvolving +public abstract class AbstractDialect + extends org.apache.flink.connector.jdbc.core.table.dialect.AbstractDialect + implements org.apache.flink.connector.jdbc.dialect.JdbcDialect {} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java similarity index 59% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java rename to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java index c3a21829..54a61d4c 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/databases/db2/dialect/Db2DialectFactory.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java @@ -16,22 +16,19 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.databases.db2.dialect; +package org.apache.flink.connector.jdbc.dialect; -import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.jdbc.dialect.JdbcDialect; -import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; -/** Factory for {@link Db2Dialect}. */ -@Internal -public class Db2DialectFactory implements JdbcDialectFactory { - @Override - public boolean acceptsURL(String url) { - return url.startsWith("jdbc:db2:"); - } - - @Override - public JdbcDialect create() { - return new Db2Dialect(); - } -} +/** + * Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable + * and stateless. + * + * @see JdbcFactory + * @deprecated use org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect + */ +@Deprecated +@PublicEvolving +public interface JdbcDialect + extends org.apache.flink.connector.jdbc.core.table.dialect.JdbcDialect {} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java similarity index 50% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java rename to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java index 84f4bd78..3fc31c49 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java @@ -18,41 +18,34 @@ package org.apache.flink.connector.jdbc.dialect; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.jdbc.core.table.JdbcFactory; import org.apache.flink.util.StringUtils; /** - * A factory to create a specific {@link JdbcDialect}. This factory is used with Java's Service - * Provider Interfaces (SPI) for discovering. + * A factory to create a specific {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect}. This + * factory is used with Java's Service Provider Interfaces (SPI) for discovering. * * <p>Classes that implement this interface can be added to the - * "META_INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory" file of a JAR file - * in the current classpath to be found. + * "META_INF/services/org.apache.flink.connector.jdbc.JdbcDialectFactory" file of a JAR file in the + * current classpath to be found. * - * @see JdbcDialect + * @see org.apache.flink.connector.jdbc.dialect.JdbcDialect + * @deprecated use JdbcFactory */ -@PublicEvolving -public interface JdbcDialectFactory { +@Deprecated +public interface JdbcDialectFactory extends JdbcFactory { /** - * Retrieves whether the dialect thinks that it can open a connection to the given URL. - * Typically, dialects will return <code>true</code> if they understand the sub-protocol - * specified in the URL and <code>false</code> if they do not. - * - * @param url the URL of the database - * @return <code>true</code> if this dialect understands the given URL; <code>false</code> - * otherwise. + * @deprecated + * @return Creates a new instance of the {@link + * org.apache.flink.connector.jdbc.dialect.JdbcDialect}. */ - boolean acceptsURL(String url); - - /** @return Creates a new instance of the {@link JdbcDialect}. */ JdbcDialect create(); /** - * Creates a new instance of the {@link JdbcDialect} based on compatible mode. - * + * @deprecated * @param compatibleMode the compatible mode of database - * @return a new instance of {@link JdbcDialect} + * @return a new instance of {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect} */ default JdbcDialect create(String compatibleMode) { if (StringUtils.isNullOrWhitespaceOnly(compatibleMode)) { @@ -61,4 +54,22 @@ public interface JdbcDialectFactory { throw new UnsupportedOperationException( "Not supported option 'compatible-mode' with value: " + compatibleMode); } + + /** + * @return a new instance of the {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect}. + */ + default JdbcDialect createDialect() { + return create(); + } + + /** + * Creates a new instance of the {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect} + * based on compatible mode. + * + * @param compatibleMode the compatible mode of database + * @return a new instance of {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect} + */ + default JdbcDialect createDialect(String compatibleMode) { + return create(compatibleMode); + } }
