wuchong commented on a change in pull request #12176:
URL: https://github.com/apache/flink/pull/12176#discussion_r426165412



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import 
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSource} for JDBC.
+ */
+@Internal
+public class JdbcDynamicTableSource implements ScanTableSource, 
LookupTableSource {
+
+       private final JdbcOptions options;
+       private final JdbcReadOptions readOptions;
+       private final JdbcLookupOptions lookupOptions;
+       private final TableSchema schema;
+       private final int[] selectFields;
+       private final String dialectName;
+
+       public JdbcDynamicTableSource(
+                       JdbcOptions options,
+                       JdbcReadOptions readOptions,
+                       JdbcLookupOptions lookupOptions,
+                       TableSchema schema,
+                       int[] selectFields) {
+               this.options = options;
+               this.readOptions = readOptions;
+               this.lookupOptions = lookupOptions;
+               this.schema = schema;
+               this.selectFields = selectFields;
+               this.dialectName = options.getDialect().dialectName();
+       }
+
+       @Override
+       public LookupRuntimeProvider 
getLookupRuntimeProvider(LookupTableSource.Context context) {
+               // JDBC only support non-nested look up keys
+               String[] keyNames = new String[context.getKeys().length];
+               for (int i = 0; i < keyNames.length; i++) {
+                       int[] innerKeyArr = context.getKeys()[i];
+                       Preconditions.checkArgument(innerKeyArr.length == 1,
+                               "JDBC only support non-nested look up keys");
+                       keyNames[i] = schema.getFieldNames()[innerKeyArr[0]];
+               }
+               final RowType rowType = (RowType) 
schema.toRowDataType().getLogicalType();
+
+               return TableFunctionProvider.of(new JdbcRowDataLookupFunction(
+                       options,
+                       lookupOptions,
+                       schema.getFieldNames(),
+                       schema.getFieldDataTypes(),
+                       keyNames,
+                       rowType));
+       }
+
+       @Override
+       public ScanRuntimeProvider 
getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
+               final JdbcRowDataInputFormat.Builder builder = 
JdbcRowDataInputFormat.builder()
+                       .setDrivername(options.getDriverName())
+                       .setDBUrl(options.getDbURL())
+                       .setUsername(options.getUsername().orElse(null))
+                       .setPassword(options.getPassword().orElse(null));
+
+               if (readOptions.getFetchSize() != 0) {
+                       builder.setFetchSize(readOptions.getFetchSize());
+               }
+               final JdbcDialect dialect = options.getDialect();
+               String query = dialect.getSelectFromStatement(
+                       options.getTableName(), schema.getFieldNames(), new 
String[0]);
+               if (readOptions.getPartitionColumnName().isPresent()) {
+                       long lowerBound = 
readOptions.getPartitionLowerBound().get();
+                       long upperBound = 
readOptions.getPartitionUpperBound().get();
+                       int numPartitions = 
readOptions.getNumPartitions().get();
+                       builder.setParametersProvider(
+                               new 
JdbcNumericBetweenParametersProvider(lowerBound, 
upperBound).ofBatchNum(numPartitions));
+                       query += " WHERE " +
+                               
dialect.quoteIdentifier(readOptions.getPartitionColumnName().get()) +
+                               " BETWEEN ? AND ?";
+               }
+               builder.setQuery(query);
+               final RowType rowType = RowType.of(
+                       Arrays.stream(schema.getFieldDataTypes())
+                               .map(DataType::getLogicalType)
+                               .toArray(LogicalType[]::new),
+                       schema.getFieldNames());
+               builder.setRowConverter(dialect.getRowConverter(rowType));
+               @SuppressWarnings("unchecked")

Review comment:
       move to method

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/AbstractJdbcRowConverter.java
##########
@@ -18,54 +18,273 @@
 
 package org.apache.flink.connector.jdbc.internal.converter;
 
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
 
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Base for all row converters.
+ * Base class for all converters that convert between JDBC object and Flink 
internal object.
  */
 public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
 
        protected final RowType rowType;
-       protected final JdbcFieldConverter[] converters;
+       protected final JdbcFieldConverter[] toInternalConverters;
+       protected final RowFieldConverter[] toExternalConverters;
+       protected final LogicalType[] fieldTypes;
 
        public AbstractJdbcRowConverter(RowType rowType) {
                this.rowType = checkNotNull(rowType);
-               converters = new JdbcFieldConverter[rowType.getFieldCount()];
+               this.fieldTypes = rowType.getFields().stream()
+                       .map(RowType.RowField::getType)
+                       .toArray(LogicalType[]::new);
+               this.toInternalConverters = new 
JdbcFieldConverter[rowType.getFieldCount()];
+               this.toExternalConverters = new 
RowFieldConverter[rowType.getFieldCount()];
+               for (int i = 0; i < rowType.getFieldCount(); i++) {
+                       toInternalConverters[i] = 
createNullableInternalConverter(rowType.getTypeAt(i));
+                       toExternalConverters[i] = 
createNullableExternalConverter(fieldTypes[i]);
+               }
+       }
 
-               for (int i = 0; i < converters.length; i++) {
-                       converters[i] = createConverter(rowType.getTypeAt(i));
+       @Override
+       public RowData toInternal(ResultSet resultSet) throws SQLException {
+               GenericRowData genericRowData = new 
GenericRowData(rowType.getFieldCount());
+               try {
+                       for (int pos = 0; pos < rowType.getFieldCount(); pos++) 
{
+                               genericRowData.setField(pos, 
toInternalConverters[pos].convert(resultSet.getObject(pos + 1)));
+                       }
+               } catch (Exception e) {
+                       e.printStackTrace();

Review comment:
       remove.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/AbstractJdbcRowConverter.java
##########
@@ -18,54 +18,273 @@
 
 package org.apache.flink.connector.jdbc.internal.converter;
 
+import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
 
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Base for all row converters.
+ * Base class for all converters that convert between JDBC object and Flink 
internal object.
  */
 public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
 
        protected final RowType rowType;
-       protected final JdbcFieldConverter[] converters;
+       protected final JdbcFieldConverter[] toInternalConverters;
+       protected final RowFieldConverter[] toExternalConverters;
+       protected final LogicalType[] fieldTypes;
 
        public AbstractJdbcRowConverter(RowType rowType) {
                this.rowType = checkNotNull(rowType);
-               converters = new JdbcFieldConverter[rowType.getFieldCount()];
+               this.fieldTypes = rowType.getFields().stream()
+                       .map(RowType.RowField::getType)
+                       .toArray(LogicalType[]::new);
+               this.toInternalConverters = new 
JdbcFieldConverter[rowType.getFieldCount()];
+               this.toExternalConverters = new 
RowFieldConverter[rowType.getFieldCount()];
+               for (int i = 0; i < rowType.getFieldCount(); i++) {
+                       toInternalConverters[i] = 
createNullableInternalConverter(rowType.getTypeAt(i));
+                       toExternalConverters[i] = 
createNullableExternalConverter(fieldTypes[i]);
+               }
+       }
 
-               for (int i = 0; i < converters.length; i++) {
-                       converters[i] = createConverter(rowType.getTypeAt(i));
+       @Override
+       public RowData toInternal(ResultSet resultSet) throws SQLException {
+               GenericRowData genericRowData = new 
GenericRowData(rowType.getFieldCount());
+               try {
+                       for (int pos = 0; pos < rowType.getFieldCount(); pos++) 
{
+                               genericRowData.setField(pos, 
toInternalConverters[pos].convert(resultSet.getObject(pos + 1)));
+                       }
+               } catch (Exception e) {
+                       e.printStackTrace();
                }
+
+               return genericRowData;
        }
 
        @Override
-       public Row convert(ResultSet resultSet, Row reuse) throws SQLException {
-               for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
-                       reuse.setField(pos, 
converters[pos].convert(resultSet.getObject(pos + 1)));
+       public PreparedStatement toExternal(RowData rowData, PreparedStatement 
statement) throws SQLException {
+               for (int index = 0; index < rowData.getArity(); index++) {
+                       toExternalConverters[index].convert(statement, index, 
fieldTypes[index], rowData);
                }
+               return statement;       }

Review comment:
       style.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource}
+ * and {@link JdbcDynamicTableSink}.
+ */
+@Internal
+public class JdbcDynamicTableSourceSinkFactory implements 
DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+       public static final String IDENTIFIER = "jdbc";
+       public static final ConfigOption<String> URL = ConfigOptions
+               .key("url")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc database url.");
+       public static final ConfigOption<String> TABLE_NAME = ConfigOptions
+               .key("table-name")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc table name.");
+       public static final ConfigOption<String> USERNAME = ConfigOptions
+               .key("username")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc user name.");
+       public static final ConfigOption<String> PASSWORD = ConfigOptions
+               .key("password")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc password.");
+       private static final ConfigOption<String> DRIVER = ConfigOptions
+               .key("driver")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the class name of the JDBC driver to use to 
connect to this URL. " +
+                       "If not set, it will automatically be derived from the 
URL.");
+
+       // read config options
+       private static final ConfigOption<String> SCAN_PARTITION_COLUMN = 
ConfigOptions
+               .key("read.partition.column")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the column name used for partitioning the 
input.");
+       private static final ConfigOption<Integer> SCAN_PARTITION_NUM = 
ConfigOptions
+               .key("scan.partition.num")
+               .intType()
+               .noDefaultValue()
+               .withDescription("the number of partitions.");
+       private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = 
ConfigOptions
+               .key("scan.partition.lower-bound")
+               .longType()
+               .noDefaultValue()
+               .withDescription("the smallest value of the first partition.");
+       private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = 
ConfigOptions
+               .key("scan.partition.upper-bound")
+               .longType()
+               .noDefaultValue()
+               .withDescription("the largest value of the last partition.");
+       private static final ConfigOption<Integer> SCAN_FETCH_SIZE = 
ConfigOptions
+               .key("scan.fetch-size")
+               .intType()
+               .defaultValue(0)
+               .withDescription("gives the reader a hint as to the number of 
rows that should be fetched, from" +
+                       " the database when reading per round trip. If the 
value specified is zero, then the hint is ignored. The" +
+                       " default value is zero.");
+
+       // look up config options
+       private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = 
ConfigOptions
+               .key("lookup.cache.max-rows")
+               .longType()
+               .defaultValue(-1L)
+               .withDescription("the max number of rows of lookup cache, over 
this value, the oldest rows will " +
+                       "be eliminated. \"cache.max-rows\" and \"cache.ttl\" 
options must all be specified if any of them is " +
+                       "specified. Cache is not enabled as default.");
+       private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = 
ConfigOptions
+               .key("lookup.cache.ttl")
+               .durationType()
+               .defaultValue(Duration.ofSeconds(10))
+               .withDescription("the cache time to live.");
+       private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = 
ConfigOptions
+               .key("lookup.max-retries")
+               .intType()
+               .defaultValue(3)
+               .withDescription("the max retry times if lookup database 
failed.");
+
+       // write config options
+       private static final ConfigOption<Integer> SINK_FLUSH_MAX_ROWS = 
ConfigOptions
+               .key("sink.flush.max-rows")
+               .intType()
+               .defaultValue(5000)
+               .withDescription("the flush max size (includes all append, 
upsert and delete records), over this number" +
+                       " of records, will flush data. The default value is 
5000.");
+       private static final ConfigOption<Long> SINK_FLUSH_INTERVAL = 
ConfigOptions
+               .key("sink.flush.interval")
+               .longType()
+               .defaultValue(0L)
+               .withDescription("the flush interval mills, over this time, 
asynchronous threads will flush data. The " +
+                       "default value is 0, which means no asynchronous flush 
thread will be scheduled.");
+       private static final ConfigOption<Integer> SINK_MAX_RETRIES = 
ConfigOptions
+               .key("sink.max-retries")
+               .intType()
+               .defaultValue(3)
+               .withDescription("the max retry times if writing records to 
database failed.");
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+               final ReadableConfig config = helper.getOptions();
+
+               helper.validate();
+               validateConfigOptions(config);
+               JdbcOptions jdbcOptions = getJdbcOptions(config);
+               TableSchema formatSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

Review comment:
       physicalSchema

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource}
+ * and {@link JdbcDynamicTableSink}.
+ */
+@Internal
+public class JdbcDynamicTableSourceSinkFactory implements 
DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+       public static final String IDENTIFIER = "jdbc";
+       public static final ConfigOption<String> URL = ConfigOptions
+               .key("url")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc database url.");
+       public static final ConfigOption<String> TABLE_NAME = ConfigOptions
+               .key("table-name")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc table name.");
+       public static final ConfigOption<String> USERNAME = ConfigOptions
+               .key("username")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc user name.");
+       public static final ConfigOption<String> PASSWORD = ConfigOptions
+               .key("password")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc password.");
+       private static final ConfigOption<String> DRIVER = ConfigOptions
+               .key("driver")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the class name of the JDBC driver to use to 
connect to this URL. " +
+                       "If not set, it will automatically be derived from the 
URL.");
+
+       // read config options
+       private static final ConfigOption<String> SCAN_PARTITION_COLUMN = 
ConfigOptions
+               .key("read.partition.column")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the column name used for partitioning the 
input.");
+       private static final ConfigOption<Integer> SCAN_PARTITION_NUM = 
ConfigOptions
+               .key("scan.partition.num")
+               .intType()
+               .noDefaultValue()
+               .withDescription("the number of partitions.");
+       private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = 
ConfigOptions
+               .key("scan.partition.lower-bound")
+               .longType()
+               .noDefaultValue()
+               .withDescription("the smallest value of the first partition.");
+       private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = 
ConfigOptions
+               .key("scan.partition.upper-bound")
+               .longType()
+               .noDefaultValue()
+               .withDescription("the largest value of the last partition.");
+       private static final ConfigOption<Integer> SCAN_FETCH_SIZE = 
ConfigOptions
+               .key("scan.fetch-size")
+               .intType()
+               .defaultValue(0)
+               .withDescription("gives the reader a hint as to the number of 
rows that should be fetched, from" +
+                       " the database when reading per round trip. If the 
value specified is zero, then the hint is ignored. The" +
+                       " default value is zero.");
+
+       // look up config options
+       private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = 
ConfigOptions
+               .key("lookup.cache.max-rows")
+               .longType()
+               .defaultValue(-1L)
+               .withDescription("the max number of rows of lookup cache, over 
this value, the oldest rows will " +
+                       "be eliminated. \"cache.max-rows\" and \"cache.ttl\" 
options must all be specified if any of them is " +
+                       "specified. Cache is not enabled as default.");
+       private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = 
ConfigOptions
+               .key("lookup.cache.ttl")
+               .durationType()
+               .defaultValue(Duration.ofSeconds(10))
+               .withDescription("the cache time to live.");
+       private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = 
ConfigOptions
+               .key("lookup.max-retries")
+               .intType()
+               .defaultValue(3)
+               .withDescription("the max retry times if lookup database 
failed.");
+
+       // write config options
+       private static final ConfigOption<Integer> SINK_FLUSH_MAX_ROWS = 
ConfigOptions
+               .key("sink.flush.max-rows")
+               .intType()
+               .defaultValue(5000)
+               .withDescription("the flush max size (includes all append, 
upsert and delete records), over this number" +
+                       " of records, will flush data. The default value is 
5000.");
+       private static final ConfigOption<Long> SINK_FLUSH_INTERVAL = 
ConfigOptions
+               .key("sink.flush.interval")
+               .longType()
+               .defaultValue(0L)
+               .withDescription("the flush interval mills, over this time, 
asynchronous threads will flush data. The " +
+                       "default value is 0, which means no asynchronous flush 
thread will be scheduled.");
+       private static final ConfigOption<Integer> SINK_MAX_RETRIES = 
ConfigOptions
+               .key("sink.max-retries")
+               .intType()
+               .defaultValue(3)
+               .withDescription("the max retry times if writing records to 
database failed.");
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+               final ReadableConfig config = helper.getOptions();
+
+               helper.validate();
+               validateConfigOptions(config);
+               JdbcOptions jdbcOptions = getJdbcOptions(config);
+               TableSchema formatSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+               return new JdbcDynamicTableSink(
+                       jdbcOptions,
+                       getJdbcExecutionOptions(config),
+                       getJdbcDmlOptions(jdbcOptions, 
context.getCatalogTable().getSchema()),
+                       formatSchema);
+       }
+
+       @Override
+       public DynamicTableSource createDynamicTableSource(Context context) {
+               final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+               final ReadableConfig config = helper.getOptions();
+
+               helper.validate();
+               validateConfigOptions(config);
+               TableSchema formatSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+               int[] selectFields = new 
int[formatSchema.getFieldNames().length];
+               for (int i = 0; i < selectFields.length; i++) {
+                       selectFields[i] = i;
+               }
+               return new JdbcDynamicTableSource(
+                       getJdbcOptions(helper.getOptions()),
+                       getJdbcReadOptions(helper.getOptions()),
+                       getJdbcLookupOptions(helper.getOptions()),
+                       formatSchema,
+                       selectFields);
+       }
+
+       private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
+               final String url = readableConfig.get(URL);
+               final JdbcOptions.Builder builder = JdbcOptions.builder()
+                       .setDBUrl(url)
+                       .setTableName(readableConfig.get(TABLE_NAME))
+                       .setDialect(JdbcDialects.get(url).get());
+
+               
readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
+               
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+               
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+               return builder.build();
+       }
+
+       private JdbcReadOptions getJdbcReadOptions(ReadableConfig 
readableConfig) {
+               final Optional<String> partitionColumnName = 
readableConfig.getOptional(SCAN_PARTITION_COLUMN);
+               final JdbcReadOptions.Builder builder = 
JdbcReadOptions.builder();
+               if (partitionColumnName.isPresent()) {
+                       
builder.setPartitionColumnName(partitionColumnName.get());
+                       
builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
+                       
builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
+                       
builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM));
+               }
+               
readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize);
+               return builder.build();
+       }
+
+       private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig 
readableConfig) {
+               return new JdbcLookupOptions(
+                       
readableConfig.getOptional(LOOKUP_CACHE_MAX_ROWS).orElse(LOOKUP_CACHE_MAX_ROWS.defaultValue()),
+                       
readableConfig.getOptional(LOOKUP_CACHE_TTL).orElse(LOOKUP_CACHE_TTL.defaultValue()).toMillis(),
+                       
readableConfig.getOptional(LOOKUP_MAX_RETRIES).orElse(LOOKUP_MAX_RETRIES.defaultValue()));
+       }
+
+       private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig 
readableConfig) {
+               final JdbcExecutionOptions.Builder builder = new 
JdbcExecutionOptions.Builder();
+               readableConfig.getOptional(SINK_FLUSH_MAX_ROWS)
+                       .ifPresent(builder::withBatchSize);

Review comment:
       do not need `ifPresent`

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource}
+ * and {@link JdbcDynamicTableSink}.
+ */
+@Internal
+public class JdbcDynamicTableSourceSinkFactory implements 
DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+       public static final String IDENTIFIER = "jdbc";
+       public static final ConfigOption<String> URL = ConfigOptions
+               .key("url")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc database url.");
+       public static final ConfigOption<String> TABLE_NAME = ConfigOptions
+               .key("table-name")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc table name.");
+       public static final ConfigOption<String> USERNAME = ConfigOptions
+               .key("username")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc user name.");
+       public static final ConfigOption<String> PASSWORD = ConfigOptions
+               .key("password")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc password.");
+       private static final ConfigOption<String> DRIVER = ConfigOptions
+               .key("driver")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the class name of the JDBC driver to use to 
connect to this URL. " +
+                       "If not set, it will automatically be derived from the 
URL.");
+
+       // read config options
+       private static final ConfigOption<String> SCAN_PARTITION_COLUMN = 
ConfigOptions
+               .key("read.partition.column")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the column name used for partitioning the 
input.");
+       private static final ConfigOption<Integer> SCAN_PARTITION_NUM = 
ConfigOptions
+               .key("scan.partition.num")
+               .intType()
+               .noDefaultValue()
+               .withDescription("the number of partitions.");
+       private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = 
ConfigOptions
+               .key("scan.partition.lower-bound")
+               .longType()
+               .noDefaultValue()
+               .withDescription("the smallest value of the first partition.");
+       private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = 
ConfigOptions
+               .key("scan.partition.upper-bound")
+               .longType()
+               .noDefaultValue()
+               .withDescription("the largest value of the last partition.");
+       private static final ConfigOption<Integer> SCAN_FETCH_SIZE = 
ConfigOptions
+               .key("scan.fetch-size")
+               .intType()
+               .defaultValue(0)
+               .withDescription("gives the reader a hint as to the number of 
rows that should be fetched, from" +
+                       " the database when reading per round trip. If the 
value specified is zero, then the hint is ignored. The" +
+                       " default value is zero.");
+
+       // look up config options
+       private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = 
ConfigOptions
+               .key("lookup.cache.max-rows")
+               .longType()
+               .defaultValue(-1L)
+               .withDescription("the max number of rows of lookup cache, over 
this value, the oldest rows will " +
+                       "be eliminated. \"cache.max-rows\" and \"cache.ttl\" 
options must all be specified if any of them is " +
+                       "specified. Cache is not enabled as default.");
+       private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = 
ConfigOptions
+               .key("lookup.cache.ttl")
+               .durationType()
+               .defaultValue(Duration.ofSeconds(10))
+               .withDescription("the cache time to live.");
+       private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = 
ConfigOptions
+               .key("lookup.max-retries")
+               .intType()
+               .defaultValue(3)
+               .withDescription("the max retry times if lookup database 
failed.");
+
+       // write config options
+       private static final ConfigOption<Integer> SINK_FLUSH_MAX_ROWS = 
ConfigOptions
+               .key("sink.flush.max-rows")
+               .intType()
+               .defaultValue(5000)
+               .withDescription("the flush max size (includes all append, 
upsert and delete records), over this number" +
+                       " of records, will flush data. The default value is 
5000.");
+       private static final ConfigOption<Long> SINK_FLUSH_INTERVAL = 
ConfigOptions
+               .key("sink.flush.interval")
+               .longType()
+               .defaultValue(0L)
+               .withDescription("the flush interval mills, over this time, 
asynchronous threads will flush data. The " +
+                       "default value is 0, which means no asynchronous flush 
thread will be scheduled.");
+       private static final ConfigOption<Integer> SINK_MAX_RETRIES = 
ConfigOptions
+               .key("sink.max-retries")
+               .intType()
+               .defaultValue(3)
+               .withDescription("the max retry times if writing records to 
database failed.");
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+               final ReadableConfig config = helper.getOptions();
+
+               helper.validate();
+               validateConfigOptions(config);
+               JdbcOptions jdbcOptions = getJdbcOptions(config);
+               TableSchema formatSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+               return new JdbcDynamicTableSink(
+                       jdbcOptions,
+                       getJdbcExecutionOptions(config),
+                       getJdbcDmlOptions(jdbcOptions, 
context.getCatalogTable().getSchema()),
+                       formatSchema);
+       }
+
+       @Override
+       public DynamicTableSource createDynamicTableSource(Context context) {
+               final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+               final ReadableConfig config = helper.getOptions();
+
+               helper.validate();
+               validateConfigOptions(config);
+               TableSchema formatSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+               int[] selectFields = new 
int[formatSchema.getFieldNames().length];
+               for (int i = 0; i < selectFields.length; i++) {
+                       selectFields[i] = i;
+               }
+               return new JdbcDynamicTableSource(
+                       getJdbcOptions(helper.getOptions()),
+                       getJdbcReadOptions(helper.getOptions()),
+                       getJdbcLookupOptions(helper.getOptions()),
+                       formatSchema,
+                       selectFields);
+       }
+
+       private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
+               final String url = readableConfig.get(URL);
+               final JdbcOptions.Builder builder = JdbcOptions.builder()
+                       .setDBUrl(url)
+                       .setTableName(readableConfig.get(TABLE_NAME))
+                       .setDialect(JdbcDialects.get(url).get());
+
+               
readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
+               
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+               
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+               return builder.build();
+       }
+
+       private JdbcReadOptions getJdbcReadOptions(ReadableConfig 
readableConfig) {
+               final Optional<String> partitionColumnName = 
readableConfig.getOptional(SCAN_PARTITION_COLUMN);
+               final JdbcReadOptions.Builder builder = 
JdbcReadOptions.builder();
+               if (partitionColumnName.isPresent()) {
+                       
builder.setPartitionColumnName(partitionColumnName.get());
+                       
builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
+                       
builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
+                       
builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM));
+               }
+               
readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize);
+               return builder.build();
+       }
+
+       private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig 
readableConfig) {
+               return new JdbcLookupOptions(
+                       
readableConfig.getOptional(LOOKUP_CACHE_MAX_ROWS).orElse(LOOKUP_CACHE_MAX_ROWS.defaultValue()),
+                       
readableConfig.getOptional(LOOKUP_CACHE_TTL).orElse(LOOKUP_CACHE_TTL.defaultValue()).toMillis(),
+                       
readableConfig.getOptional(LOOKUP_MAX_RETRIES).orElse(LOOKUP_MAX_RETRIES.defaultValue()));
+       }
+
+       private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig 
readableConfig) {
+               final JdbcExecutionOptions.Builder builder = new 
JdbcExecutionOptions.Builder();
+               readableConfig.getOptional(SINK_FLUSH_MAX_ROWS)
+                       .ifPresent(builder::withBatchSize);
+               readableConfig.getOptional(SINK_FLUSH_INTERVAL)
+                       .ifPresent(builder::withBatchIntervalMs);
+               readableConfig.getOptional(SINK_MAX_RETRIES)
+                       .ifPresent(builder::withMaxRetries);
+               return builder.build();
+       }
+
+       private JdbcDmlOptions getJdbcDmlOptions(JdbcOptions jdbcOptions, 
TableSchema schema) {
+               String[] keyFields = schema.getPrimaryKey()
+                       .map(pk -> pk.getColumns().toArray(new String[0]))
+                       .orElse(null);
+
+               return JdbcDmlOptions.builder()
+                       .withTableName(jdbcOptions.getTableName())
+                       .withDialect(jdbcOptions.getDialect())
+                       .withFieldNames(schema.getFieldNames())
+                       .withKeyFields(keyFields)
+                       .build();
+       }
+
+       @Override
+       public String factoryIdentifier() {
+               return IDENTIFIER;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+               requiredOptions.add(URL);
+               requiredOptions.add(TABLE_NAME);
+               return requiredOptions;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+               optionalOptions.add(DRIVER);
+               optionalOptions.add(USERNAME);
+               optionalOptions.add(PASSWORD);
+               optionalOptions.add(SCAN_PARTITION_COLUMN);
+               optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
+               optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
+               optionalOptions.add(SCAN_PARTITION_NUM);
+               optionalOptions.add(SCAN_FETCH_SIZE);
+               optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
+               optionalOptions.add(LOOKUP_CACHE_TTL);
+               optionalOptions.add(LOOKUP_MAX_RETRIES);
+               optionalOptions.add(SINK_FLUSH_MAX_ROWS);
+               optionalOptions.add(SINK_FLUSH_INTERVAL);
+               optionalOptions.add(SINK_MAX_RETRIES);
+               return optionalOptions;
+       }
+
+       private void validateConfigOptions(ReadableConfig config) {
+               config.getOptional(URL).orElseThrow(() -> new 
IllegalArgumentException(
+                       String.format("Could not find required property: %s", 
URL.key())));
+               config.getOptional(TABLE_NAME).orElseThrow(() -> new 
IllegalArgumentException(
+                       String.format("Could not find required property: %s", 
TABLE_NAME.key())));

Review comment:
       ```suggestion
                        String.format("Could not find required option: %s", 
TABLE_NAME.key())));
   ```

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource}
+ * and {@link JdbcDynamicTableSink}.
+ */
+@Internal
+public class JdbcDynamicTableSourceSinkFactory implements 
DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+       public static final String IDENTIFIER = "jdbc";
+       public static final ConfigOption<String> URL = ConfigOptions
+               .key("url")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc database url.");
+       public static final ConfigOption<String> TABLE_NAME = ConfigOptions
+               .key("table-name")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc table name.");
+       public static final ConfigOption<String> USERNAME = ConfigOptions
+               .key("username")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc user name.");
+       public static final ConfigOption<String> PASSWORD = ConfigOptions
+               .key("password")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the jdbc password.");
+       private static final ConfigOption<String> DRIVER = ConfigOptions
+               .key("driver")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the class name of the JDBC driver to use to 
connect to this URL. " +
+                       "If not set, it will automatically be derived from the 
URL.");
+
+       // read config options
+       private static final ConfigOption<String> SCAN_PARTITION_COLUMN = 
ConfigOptions
+               .key("read.partition.column")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("the column name used for partitioning the 
input.");
+       private static final ConfigOption<Integer> SCAN_PARTITION_NUM = 
ConfigOptions
+               .key("scan.partition.num")
+               .intType()
+               .noDefaultValue()
+               .withDescription("the number of partitions.");
+       private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = 
ConfigOptions
+               .key("scan.partition.lower-bound")
+               .longType()
+               .noDefaultValue()
+               .withDescription("the smallest value of the first partition.");
+       private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = 
ConfigOptions
+               .key("scan.partition.upper-bound")
+               .longType()
+               .noDefaultValue()
+               .withDescription("the largest value of the last partition.");
+       private static final ConfigOption<Integer> SCAN_FETCH_SIZE = 
ConfigOptions
+               .key("scan.fetch-size")
+               .intType()
+               .defaultValue(0)
+               .withDescription("gives the reader a hint as to the number of 
rows that should be fetched, from" +
+                       " the database when reading per round trip. If the 
value specified is zero, then the hint is ignored. The" +
+                       " default value is zero.");
+
+       // look up config options
+       private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = 
ConfigOptions
+               .key("lookup.cache.max-rows")
+               .longType()
+               .defaultValue(-1L)
+               .withDescription("the max number of rows of lookup cache, over 
this value, the oldest rows will " +
+                       "be eliminated. \"cache.max-rows\" and \"cache.ttl\" 
options must all be specified if any of them is " +
+                       "specified. Cache is not enabled as default.");
+       private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = 
ConfigOptions
+               .key("lookup.cache.ttl")
+               .durationType()
+               .defaultValue(Duration.ofSeconds(10))
+               .withDescription("the cache time to live.");
+       private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = 
ConfigOptions
+               .key("lookup.max-retries")
+               .intType()
+               .defaultValue(3)
+               .withDescription("the max retry times if lookup database 
failed.");
+
+       // write config options
+       private static final ConfigOption<Integer> SINK_FLUSH_MAX_ROWS = 
ConfigOptions
+               .key("sink.flush.max-rows")
+               .intType()
+               .defaultValue(5000)
+               .withDescription("the flush max size (includes all append, 
upsert and delete records), over this number" +
+                       " of records, will flush data. The default value is 
5000.");
+       private static final ConfigOption<Long> SINK_FLUSH_INTERVAL = 
ConfigOptions
+               .key("sink.flush.interval")
+               .longType()
+               .defaultValue(0L)
+               .withDescription("the flush interval mills, over this time, 
asynchronous threads will flush data. The " +
+                       "default value is 0, which means no asynchronous flush 
thread will be scheduled.");
+       private static final ConfigOption<Integer> SINK_MAX_RETRIES = 
ConfigOptions
+               .key("sink.max-retries")
+               .intType()
+               .defaultValue(3)
+               .withDescription("the max retry times if writing records to 
database failed.");
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+               final ReadableConfig config = helper.getOptions();
+
+               helper.validate();
+               validateConfigOptions(config);
+               JdbcOptions jdbcOptions = getJdbcOptions(config);
+               TableSchema formatSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+               return new JdbcDynamicTableSink(
+                       jdbcOptions,
+                       getJdbcExecutionOptions(config),
+                       getJdbcDmlOptions(jdbcOptions, 
context.getCatalogTable().getSchema()),
+                       formatSchema);
+       }
+
+       @Override
+       public DynamicTableSource createDynamicTableSource(Context context) {
+               final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+               final ReadableConfig config = helper.getOptions();
+
+               helper.validate();
+               validateConfigOptions(config);
+               TableSchema formatSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+               int[] selectFields = new 
int[formatSchema.getFieldNames().length];
+               for (int i = 0; i < selectFields.length; i++) {
+                       selectFields[i] = i;
+               }
+               return new JdbcDynamicTableSource(
+                       getJdbcOptions(helper.getOptions()),
+                       getJdbcReadOptions(helper.getOptions()),
+                       getJdbcLookupOptions(helper.getOptions()),
+                       formatSchema,
+                       selectFields);
+       }
+
+       private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
+               final String url = readableConfig.get(URL);
+               final JdbcOptions.Builder builder = JdbcOptions.builder()
+                       .setDBUrl(url)
+                       .setTableName(readableConfig.get(TABLE_NAME))
+                       .setDialect(JdbcDialects.get(url).get());
+
+               
readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
+               
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+               
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+               return builder.build();
+       }
+
+       private JdbcReadOptions getJdbcReadOptions(ReadableConfig 
readableConfig) {
+               final Optional<String> partitionColumnName = 
readableConfig.getOptional(SCAN_PARTITION_COLUMN);
+               final JdbcReadOptions.Builder builder = 
JdbcReadOptions.builder();
+               if (partitionColumnName.isPresent()) {
+                       
builder.setPartitionColumnName(partitionColumnName.get());
+                       
builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
+                       
builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
+                       
builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM));
+               }
+               
readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize);
+               return builder.build();
+       }
+
+       private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig 
readableConfig) {
+               return new JdbcLookupOptions(
+                       
readableConfig.getOptional(LOOKUP_CACHE_MAX_ROWS).orElse(LOOKUP_CACHE_MAX_ROWS.defaultValue()),
+                       
readableConfig.getOptional(LOOKUP_CACHE_TTL).orElse(LOOKUP_CACHE_TTL.defaultValue()).toMillis(),
+                       
readableConfig.getOptional(LOOKUP_MAX_RETRIES).orElse(LOOKUP_MAX_RETRIES.defaultValue()));

Review comment:
       readableConfig.get(LOOKUP_CACHE_MAX_ROWS),




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to