wuchong commented on a change in pull request #12176: URL: https://github.com/apache/flink/pull/12176#discussion_r426267397
########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/PostgresDialect.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.dialect; + +import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * JDBC dialect for Postgres. + */ +public class PostgresDialect extends AbstractDialect { + + private static final long serialVersionUID = 1L; + + // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs: + // https://www.postgresql.org/docs/12/datatype-datetime.html + private static final int MAX_TIMESTAMP_PRECISION = 6; + private static final int MIN_TIMESTAMP_PRECISION = 1; + + // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs: + // https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL + private static final int MAX_DECIMAL_PRECISION = 1000; + private static final int MIN_DECIMAL_PRECISION = 1; + + @Override + public boolean canHandle(String url) { + return url.startsWith("jdbc:postgresql:"); + } + + @Override + public JdbcRowConverter getRowConverter(RowType rowType) { + return new PostgresRowConverter(rowType); + } + + @Override + public Optional<String> defaultDriverName() { + return Optional.of("org.postgresql.Driver"); + } + + /** + * Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres. + */ + @Override + public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { + String uniqueColumns = Arrays.stream(uniqueKeyFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String updateClause = Arrays.stream(fieldNames) + .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) + .collect(Collectors.joining(", ")); + return Optional.of(getInsertIntoStatement(tableName, fieldNames) + + " ON CONFLICT (" + uniqueColumns + ")" + + " DO UPDATE SET " + updateClause + ); + } + + @Override + public String quoteIdentifier(String identifier) { + return identifier; + } + + @Override + public String dialectName() { + return "Postgres"; Review comment: Nit: "PostgreSQL"; ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java ########## @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialects; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.Preconditions; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Factory for creating configured instances of {@link JdbcDynamicTableSource} + * and {@link JdbcDynamicTableSink}. + */ +@Internal +public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "jdbc"; + public static final ConfigOption<String> URL = ConfigOptions + .key("url") + .stringType() + .noDefaultValue() + .withDescription("the jdbc database url."); + public static final ConfigOption<String> TABLE_NAME = ConfigOptions + .key("table-name") + .stringType() + .noDefaultValue() + .withDescription("the jdbc table name."); + public static final ConfigOption<String> USERNAME = ConfigOptions + .key("username") + .stringType() + .noDefaultValue() + .withDescription("the jdbc user name."); + public static final ConfigOption<String> PASSWORD = ConfigOptions + .key("password") + .stringType() + .noDefaultValue() + .withDescription("the jdbc password."); + private static final ConfigOption<String> DRIVER = ConfigOptions + .key("driver") + .stringType() + .noDefaultValue() + .withDescription("the class name of the JDBC driver to use to connect to this URL. " + + "If not set, it will automatically be derived from the URL."); + + // read config options + private static final ConfigOption<String> SCAN_PARTITION_COLUMN = ConfigOptions + .key("scan.partition.column") + .stringType() + .noDefaultValue() + .withDescription("the column name used for partitioning the input."); + private static final ConfigOption<Integer> SCAN_PARTITION_NUM = ConfigOptions + .key("scan.partition.num") + .intType() + .noDefaultValue() + .withDescription("the number of partitions."); + private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = ConfigOptions + .key("scan.partition.lower-bound") + .longType() + .noDefaultValue() + .withDescription("the smallest value of the first partition."); + private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = ConfigOptions + .key("scan.partition.upper-bound") + .longType() + .noDefaultValue() + .withDescription("the largest value of the last partition."); + private static final ConfigOption<Integer> SCAN_FETCH_SIZE = ConfigOptions + .key("scan.fetch-size") + .intType() + .defaultValue(0) + .withDescription("gives the reader a hint as to the number of rows that should be fetched, from" + + " the database when reading per round trip. If the value specified is zero, then the hint is ignored. The" + + " default value is zero."); + + // look up config options + private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions + .key("lookup.cache.max-rows") + .longType() + .defaultValue(-1L) + .withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " + + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " + + "specified. Cache is not enabled as default."); + private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions + .key("lookup.cache.ttl") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription("the cache time to live."); + private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions + .key("lookup.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if lookup database failed."); + + // write config options + private static final ConfigOption<Integer> SINK_FLUSH_MAX_ROWS = ConfigOptions + .key("sink.flush.max-rows") + .intType() + .defaultValue(5000) + .withDescription("the flush max size (includes all append, upsert and delete records), over this number" + + " of records, will flush data. The default value is 5000."); + private static final ConfigOption<Long> SINK_FLUSH_INTERVAL = ConfigOptions + .key("sink.flush.interval") + .longType() + .defaultValue(0L) + .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + + "default value is 0, which means no asynchronous flush thread will be scheduled."); + private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions + .key("sink.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if writing records to database failed."); + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig config = helper.getOptions(); + + helper.validate(); + validateConfigOptions(config); + JdbcOptions jdbcOptions = getJdbcOptions(config); + TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + + return new JdbcDynamicTableSink( + jdbcOptions, + getJdbcExecutionOptions(config), + getJdbcDmlOptions(jdbcOptions, context.getCatalogTable().getSchema()), Review comment: Should use `physicalSchema` here. We should keep primary key in `TableSchemaUtils.getPhysicalSchema`. ES connector also did this: https://github.com/apache/flink/pull/12184/files#diff-8fe9a415913cbd47651e39f66af0114e ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataOutputFormat.java ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.connector.jdbc.utils.JdbcUtils; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.RowData.FieldGetter; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.function.Function; + +import static org.apache.flink.table.data.RowData.createFieldGetter; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * OutputFormat for {@link JdbcDynamicTableSource}. + */ +@Internal +public class JdbcRowDataOutputFormat extends JdbcBatchingOutputFormat<RowData, RowData, JdbcBatchStatementExecutor<RowData>> { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataOutputFormat.class); + + private JdbcBatchStatementExecutor<RowData> deleteExecutor; + private final JdbcDmlOptions dmlOptions; + private final LogicalType[] logicalTypes; + + private JdbcRowDataOutputFormat( + JdbcConnectionProvider connectionProvider, + JdbcDmlOptions dmlOptions, + JdbcExecutionOptions batchOptions, + TypeInformation<RowData> rowDataTypeInfo, + LogicalType[] logicalTypes) { + super( + connectionProvider, + batchOptions, + ctx -> createUpsertRowExecutor(dmlOptions, ctx, rowDataTypeInfo, logicalTypes), + RecordExtractor.identity()); + this.dmlOptions = dmlOptions; + this.logicalTypes = logicalTypes; + } + + private JdbcRowDataOutputFormat( + JdbcConnectionProvider connectionProvider, + JdbcDmlOptions dmlOptions, + JdbcExecutionOptions batchOptions, + TypeInformation<RowData> rowDataTypeInfo, + LogicalType[] logicalTypes, + String sql) { + super(connectionProvider, + batchOptions, + ctx -> createSimpleRowDataExecutor(dmlOptions.getDialect(), sql, logicalTypes, ctx, rowDataTypeInfo), + RecordExtractor.identity()); + this.dmlOptions = dmlOptions; + this.logicalTypes = logicalTypes; + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + deleteExecutor = createDeleteExecutor(); + super.open(taskNumber, numTasks); + try { + deleteExecutor.open(connection); + } catch (SQLException e) { + throw new IOException(e); + } + } + + private JdbcBatchStatementExecutor<RowData> createDeleteExecutor() { + int[] pkFields = Arrays.stream(dmlOptions.getFieldNames()).mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf).toArray(); + LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new); + String deleteSql = dmlOptions.getDialect().getDeleteStatement(dmlOptions.getTableName(), dmlOptions.getFieldNames()); + return createKeyedRowExecutor(dmlOptions.getDialect(), pkFields, pkTypes, deleteSql, logicalTypes); + } + + @Override + protected void addToBatch(RowData original, RowData extracted) throws SQLException { + switch (original.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + super.addToBatch(original, extracted); + break; + case DELETE: + case UPDATE_BEFORE: + deleteExecutor.addToBatch(extracted); + break; + default: + throw new UnsupportedOperationException( + String.format("unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER," + + " DELETE, but get: %s.", original.getRowKind())); + } + } + + @Override + public synchronized void close() { + try { + super.close(); + } finally { + try { + if (deleteExecutor != null) { + deleteExecutor.close(); + } + } catch (SQLException e) { + LOG.warn("unable to close delete statement runner", e); + } + } + } + + @Override + protected void attemptFlush() throws SQLException { + super.attemptFlush(); + deleteExecutor.executeBatch(); + } + + private static JdbcBatchStatementExecutor<RowData> createKeyedRowExecutor(JdbcDialect dialect, int[] pkFields, LogicalType[] pkTypes, String sql, LogicalType[] logicalTypes) { + final JdbcRowConverter rowConverter = dialect.getRowConverter(RowType.of(logicalTypes)); + final Function<RowData, RowData> keyExtractor = createRowKeyExtractor(logicalTypes, pkFields); + return JdbcBatchStatementExecutor.keyed( + sql, + keyExtractor, + (st, record) -> rowConverter + .toExternal(keyExtractor.apply(record), st)); + } + + private static JdbcBatchStatementExecutor<RowData> createUpsertRowExecutor(JdbcDmlOptions opt, RuntimeContext ctx, TypeInformation<RowData> rowDataTypeInfo, LogicalType[] logicalTypes) { + checkArgument(opt.getKeyFields().isPresent()); + + int[] pkFields = Arrays.stream(opt.getKeyFields().get()).mapToInt(Arrays.asList(opt.getFieldNames())::indexOf).toArray(); + LogicalType[] pkTypes = Arrays.stream(pkFields).mapToObj(f -> logicalTypes[f]).toArray(LogicalType[]::new); + JdbcDialect dialect = opt.getDialect(); + final TypeSerializer<RowData> typeSerializer = rowDataTypeInfo.createSerializer(ctx.getExecutionConfig()); + return opt.getDialect() + .getUpsertStatement(opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get()) + .map(sql -> createSimpleRowDataExecutor(dialect, sql, logicalTypes, ctx, rowDataTypeInfo)) + .orElseGet(() -> + new InsertOrUpdateJdbcExecutor<>( + opt.getDialect().getRowExistsStatement(opt.getTableName(), opt.getKeyFields().get()), + opt.getDialect().getInsertIntoStatement(opt.getTableName(), opt.getFieldNames()), + opt.getDialect().getUpdateStatement(opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get()), + createRowDataJdbcStatementBuilder(dialect, pkTypes), + createRowDataJdbcStatementBuilder(dialect, logicalTypes), + createRowDataJdbcStatementBuilder(dialect, logicalTypes), + createRowKeyExtractor(logicalTypes, pkFields), + ctx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : r -> r)); + } + + private static Function<RowData, RowData> createRowKeyExtractor(LogicalType[] logicalTypes, int[] pkFields) { + final FieldGetter[] fieldGetters = new FieldGetter[pkFields.length]; + for (int i = 0; i < pkFields.length; i++) { + fieldGetters[i] = createFieldGetter(logicalTypes[pkFields[i]], pkFields[i]); + } + return row -> getPrimaryKey(row, fieldGetters); + } + + private static JdbcBatchStatementExecutor<RowData> createSimpleRowDataExecutor(JdbcDialect dialect, String sql, LogicalType[] fieldTypes, RuntimeContext ctx, TypeInformation<RowData> rowDataTypeInfo) { + final TypeSerializer<RowData> typeSerializer = rowDataTypeInfo.createSerializer(ctx.getExecutionConfig()); + return JdbcBatchStatementExecutor.simple( + sql, + createRowDataJdbcStatementBuilder(dialect, fieldTypes), + ctx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity()); + } + + /** + * Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array. + * Uses {@link JdbcUtils#setRecordToStatement} + */ + private static JdbcStatementBuilder<RowData> createRowDataJdbcStatementBuilder(JdbcDialect dialect, LogicalType[] types) { + return (st, record) -> dialect.getRowConverter(RowType.of(types)).toExternal(record, st); Review comment: Do not call `dialect.getRowConverter` during runtime. ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; +import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.InputFormatProvider; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.TableFunctionProvider; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; + +/** + * A {@link DynamicTableSource} for JDBC. + */ +@Internal +public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource { + + private static final String name = "JdbcTableSource"; + private final JdbcOptions options; + private final JdbcReadOptions readOptions; + private final JdbcLookupOptions lookupOptions; + private final TableSchema schema; + private final int[] selectFields; + + public JdbcDynamicTableSource( + JdbcOptions options, + JdbcReadOptions readOptions, + JdbcLookupOptions lookupOptions, + TableSchema schema, + int[] selectFields) { + this.options = options; + this.readOptions = readOptions; + this.lookupOptions = lookupOptions; + this.schema = schema; + this.selectFields = selectFields; + } + + @Override + public LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.Context context) { + // JDBC only support non-nested look up keys + String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { + int index = context.getKeys()[i][0]; + keyNames[i] = schema.getFieldNames()[index]; + } + return TableFunctionProvider.of(JdbcDynamicLookupFunction.builder() + .setFieldNames(schema.getFieldNames()) + .setFieldTypes(schema.getFieldDataTypes()) + .setKeyNames(keyNames) + .setOptions(options) + .setLookupOptions(lookupOptions) + .build()); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) { + final DataType rowDataType = schema.toPhysicalRowDataType(); + final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(rowDataType); + final JdbcDynamicInputFormat.Builder builder = JdbcDynamicInputFormat.builder() + .setDrivername(options.getDriverName()) + .setDBUrl(options.getDbURL()) + .setUsername(options.getUsername().orElse(null)) + .setPassword(options.getPassword().orElse(null)) + .setRowTypeInfo(new RowTypeInfo(rowTypeInfo.getFieldTypes(), rowTypeInfo.getFieldNames())); + + if (readOptions.getFetchSize() != 0) { + builder.setFetchSize(readOptions.getFetchSize()); + } + final JdbcDialect dialect = options.getDialect(); + String query = dialect.getSelectFromStatement( + options.getTableName(), rowTypeInfo.getFieldNames(), new String[0]); + if (readOptions.getPartitionColumnName().isPresent()) { + long lowerBound = readOptions.getPartitionLowerBound().get(); + long upperBound = readOptions.getPartitionUpperBound().get(); + int numPartitions = readOptions.getNumPartitions().get(); + builder.setParametersProvider( + new JdbcNumericBetweenParametersProvider(lowerBound, upperBound).ofBatchNum(numPartitions)); + query += " WHERE " + + dialect.quoteIdentifier(readOptions.getPartitionColumnName().get()) + + " BETWEEN ? AND ?"; + } + builder.setQuery(query); + final RowType rowType = RowType.of( + Arrays.stream(schema.getFieldDataTypes()) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new), + schema.getFieldNames()); Review comment: Not resolved yet. ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java ########## @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialects; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.Preconditions; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Factory for creating configured instances of {@link JdbcDynamicTableSource} + * and {@link JdbcDynamicTableSink}. + */ +@Internal +public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "jdbc"; + public static final ConfigOption<String> URL = ConfigOptions + .key("url") + .stringType() + .noDefaultValue() + .withDescription("the jdbc database url."); + public static final ConfigOption<String> TABLE_NAME = ConfigOptions + .key("table-name") + .stringType() + .noDefaultValue() + .withDescription("the jdbc table name."); + public static final ConfigOption<String> USERNAME = ConfigOptions + .key("username") + .stringType() + .noDefaultValue() + .withDescription("the jdbc user name."); + public static final ConfigOption<String> PASSWORD = ConfigOptions + .key("password") + .stringType() + .noDefaultValue() + .withDescription("the jdbc password."); + private static final ConfigOption<String> DRIVER = ConfigOptions + .key("driver") + .stringType() + .noDefaultValue() + .withDescription("the class name of the JDBC driver to use to connect to this URL. " + + "If not set, it will automatically be derived from the URL."); + + // read config options + private static final ConfigOption<String> SCAN_PARTITION_COLUMN = ConfigOptions + .key("scan.partition.column") + .stringType() + .noDefaultValue() + .withDescription("the column name used for partitioning the input."); + private static final ConfigOption<Integer> SCAN_PARTITION_NUM = ConfigOptions + .key("scan.partition.num") + .intType() + .noDefaultValue() + .withDescription("the number of partitions."); + private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = ConfigOptions + .key("scan.partition.lower-bound") + .longType() + .noDefaultValue() + .withDescription("the smallest value of the first partition."); + private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = ConfigOptions + .key("scan.partition.upper-bound") + .longType() + .noDefaultValue() + .withDescription("the largest value of the last partition."); + private static final ConfigOption<Integer> SCAN_FETCH_SIZE = ConfigOptions + .key("scan.fetch-size") + .intType() + .defaultValue(0) + .withDescription("gives the reader a hint as to the number of rows that should be fetched, from" + + " the database when reading per round trip. If the value specified is zero, then the hint is ignored. The" + + " default value is zero."); + + // look up config options + private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions + .key("lookup.cache.max-rows") + .longType() + .defaultValue(-1L) + .withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " + + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " + + "specified. Cache is not enabled as default."); + private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions + .key("lookup.cache.ttl") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription("the cache time to live."); + private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions + .key("lookup.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if lookup database failed."); + + // write config options + private static final ConfigOption<Integer> SINK_FLUSH_MAX_ROWS = ConfigOptions + .key("sink.flush.max-rows") Review comment: "sink.buffer-flush.max-rows" ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java ########## @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialects; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.Preconditions; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Factory for creating configured instances of {@link JdbcDynamicTableSource} + * and {@link JdbcDynamicTableSink}. + */ +@Internal +public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "jdbc"; + public static final ConfigOption<String> URL = ConfigOptions + .key("url") + .stringType() + .noDefaultValue() + .withDescription("the jdbc database url."); + public static final ConfigOption<String> TABLE_NAME = ConfigOptions + .key("table-name") + .stringType() + .noDefaultValue() + .withDescription("the jdbc table name."); + public static final ConfigOption<String> USERNAME = ConfigOptions + .key("username") + .stringType() + .noDefaultValue() + .withDescription("the jdbc user name."); + public static final ConfigOption<String> PASSWORD = ConfigOptions + .key("password") + .stringType() + .noDefaultValue() + .withDescription("the jdbc password."); + private static final ConfigOption<String> DRIVER = ConfigOptions + .key("driver") + .stringType() + .noDefaultValue() + .withDescription("the class name of the JDBC driver to use to connect to this URL. " + + "If not set, it will automatically be derived from the URL."); + + // read config options + private static final ConfigOption<String> SCAN_PARTITION_COLUMN = ConfigOptions + .key("scan.partition.column") + .stringType() + .noDefaultValue() + .withDescription("the column name used for partitioning the input."); + private static final ConfigOption<Integer> SCAN_PARTITION_NUM = ConfigOptions + .key("scan.partition.num") + .intType() + .noDefaultValue() + .withDescription("the number of partitions."); + private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = ConfigOptions + .key("scan.partition.lower-bound") + .longType() + .noDefaultValue() + .withDescription("the smallest value of the first partition."); + private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = ConfigOptions + .key("scan.partition.upper-bound") + .longType() + .noDefaultValue() + .withDescription("the largest value of the last partition."); + private static final ConfigOption<Integer> SCAN_FETCH_SIZE = ConfigOptions + .key("scan.fetch-size") + .intType() + .defaultValue(0) + .withDescription("gives the reader a hint as to the number of rows that should be fetched, from" + + " the database when reading per round trip. If the value specified is zero, then the hint is ignored. The" + + " default value is zero."); + + // look up config options + private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions + .key("lookup.cache.max-rows") + .longType() + .defaultValue(-1L) + .withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " + + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " + + "specified. Cache is not enabled as default."); + private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions + .key("lookup.cache.ttl") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription("the cache time to live."); + private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions + .key("lookup.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if lookup database failed."); + + // write config options + private static final ConfigOption<Integer> SINK_FLUSH_MAX_ROWS = ConfigOptions + .key("sink.flush.max-rows") + .intType() + .defaultValue(5000) + .withDescription("the flush max size (includes all append, upsert and delete records), over this number" + + " of records, will flush data. The default value is 5000."); + private static final ConfigOption<Long> SINK_FLUSH_INTERVAL = ConfigOptions + .key("sink.flush.interval") Review comment: "sink.buffer-flush.interval" ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialects; +import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; +import org.apache.flink.connector.jdbc.utils.JdbcUtils; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A lookup function for {@link JdbcDynamicTableSource}. + */ +@Internal +public class JdbcRowDataLookupFunction extends TableFunction<RowData> { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class); + private static final long serialVersionUID = 1L; + + private final String query; + private final String drivername; + private final String dbURL; + private final String username; + private final String password; + private final DataType[] keyTypes; + private final int[] keySqlTypes; + private final long cacheMaxSize; + private final long cacheExpireMs; + private final int maxRetryTimes; + private final JdbcDialect jdbcDialect; + private final JdbcRowConverter jdbcRowConverter; + + private transient Connection dbConn; + private transient PreparedStatement statement; + private transient Cache<RowData, List<RowData>> cache; + + public JdbcRowDataLookupFunction( + JdbcOptions options, + JdbcLookupOptions lookupOptions, + String[] fieldNames, + DataType[] fieldTypes, + String[] keyNames, + RowType rowType) { + checkNotNull(options, "No JdbcOptions supplied."); + checkNotNull(fieldNames, "No fieldNames supplied."); + checkNotNull(fieldTypes, "No fieldTypes supplied."); + checkNotNull(keyNames, "No keyNames supplied."); + this.drivername = options.getDriverName(); + this.dbURL = options.getDbURL(); + this.username = options.getUsername().orElse(null); + this.password = options.getPassword().orElse(null); + List<String> nameList = Arrays.asList(fieldNames); + this.keyTypes = Arrays.stream(keyNames) + .map(s -> { + checkArgument(nameList.contains(s), + "keyName %s can't find in fieldNames %s.", s, nameList); + return fieldTypes[nameList.indexOf(s)]; + }) + .toArray(DataType[]::new); + this.cacheMaxSize = lookupOptions.getCacheMaxSize(); + this.cacheExpireMs = lookupOptions.getCacheExpireMs(); + this.maxRetryTimes = lookupOptions.getMaxRetryTimes(); + this.keySqlTypes = Arrays.stream(keyTypes) + .map(TypeConversions::fromDataTypeToLegacyInfo) + .mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray(); + this.query = options.getDialect().getSelectFromStatement( + options.getTableName(), fieldNames, keyNames); + this.jdbcDialect = JdbcDialects.get(dbURL) + .orElseThrow(() -> new UnsupportedOperationException(String.format("Unknown dbUrl:%s", dbURL))); + this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType); + } + + @Override + public void open(FunctionContext context) throws Exception { + try { + establishConnection(); + statement = dbConn.prepareStatement(query); + this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder() + .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS) + .maximumSize(cacheMaxSize) + .build(); + } catch (SQLException sqe) { + throw new IllegalArgumentException("open() failed.", sqe); + } catch (ClassNotFoundException cnfe) { + throw new IllegalArgumentException("JDBC driver class not found.", cnfe); + } + } + + /** + * This is a lookup method which is called by Flink framework in runtime. + * @param keys lookup keys + */ + public void eval(Object... keys) { + RowData keyRow = GenericRowData.of(keys); + if (cache != null) { + List<RowData> cachedRows = cache.getIfPresent(keyRow); + if (cachedRows != null) { + for (RowData cachedRow : cachedRows) { + collect(cachedRow); + } + return; + } + } + + for (int retry = 1; retry <= maxRetryTimes; retry++) { + try { + statement.clearParameters(); + for (int i = 0; i < keys.length; i++) { + JdbcUtils.setField(statement, keySqlTypes[i], keys[i], i); + } + try (ResultSet resultSet = statement.executeQuery()) { + if (cache == null) { + while (resultSet.next()) { + collect(convertToRowFromResultSet(resultSet)); + } + } else { + ArrayList<RowData> rows = new ArrayList<>(); + while (resultSet.next()) { + RowData row = convertToRowFromResultSet(resultSet); + rows.add(row); + collect(row); + } + rows.trimToSize(); + cache.put(keyRow, rows); + } + } + break; + } catch (SQLException e) { + LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), e); + if (retry >= maxRetryTimes) { + throw new RuntimeException("Execution of JDBC statement failed.", e); + } + + try { + Thread.sleep(1000 * retry); + } catch (InterruptedException e1) { + throw new RuntimeException(e1); + } + } + } + } + + private RowData convertToRowFromResultSet(ResultSet resultSet) throws SQLException { + return jdbcRowConverter.toInternal(resultSet); Review comment: Why wrap such a simple call into a function? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialects; +import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; +import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; +import org.apache.flink.connector.jdbc.utils.JdbcUtils; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A lookup function for {@link JdbcDynamicTableSource}. + */ +@Internal +public class JdbcRowDataLookupFunction extends TableFunction<RowData> { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class); + private static final long serialVersionUID = 1L; + + private final String query; + private final String drivername; + private final String dbURL; + private final String username; + private final String password; + private final DataType[] keyTypes; + private final int[] keySqlTypes; + private final long cacheMaxSize; + private final long cacheExpireMs; + private final int maxRetryTimes; + private final JdbcDialect jdbcDialect; + private final JdbcRowConverter jdbcRowConverter; + + private transient Connection dbConn; + private transient PreparedStatement statement; + private transient Cache<RowData, List<RowData>> cache; + + public JdbcRowDataLookupFunction( + JdbcOptions options, + JdbcLookupOptions lookupOptions, + String[] fieldNames, + DataType[] fieldTypes, + String[] keyNames, + RowType rowType) { + checkNotNull(options, "No JdbcOptions supplied."); + checkNotNull(fieldNames, "No fieldNames supplied."); + checkNotNull(fieldTypes, "No fieldTypes supplied."); + checkNotNull(keyNames, "No keyNames supplied."); + this.drivername = options.getDriverName(); + this.dbURL = options.getDbURL(); + this.username = options.getUsername().orElse(null); + this.password = options.getPassword().orElse(null); + List<String> nameList = Arrays.asList(fieldNames); + this.keyTypes = Arrays.stream(keyNames) + .map(s -> { + checkArgument(nameList.contains(s), + "keyName %s can't find in fieldNames %s.", s, nameList); + return fieldTypes[nameList.indexOf(s)]; + }) + .toArray(DataType[]::new); + this.cacheMaxSize = lookupOptions.getCacheMaxSize(); + this.cacheExpireMs = lookupOptions.getCacheExpireMs(); + this.maxRetryTimes = lookupOptions.getMaxRetryTimes(); + this.keySqlTypes = Arrays.stream(keyTypes) + .map(TypeConversions::fromDataTypeToLegacyInfo) + .mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray(); + this.query = options.getDialect().getSelectFromStatement( + options.getTableName(), fieldNames, keyNames); + this.jdbcDialect = JdbcDialects.get(dbURL) + .orElseThrow(() -> new UnsupportedOperationException(String.format("Unknown dbUrl:%s", dbURL))); + this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType); + } + + @Override + public void open(FunctionContext context) throws Exception { + try { + establishConnection(); + statement = dbConn.prepareStatement(query); + this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder() + .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS) + .maximumSize(cacheMaxSize) + .build(); + } catch (SQLException sqe) { + throw new IllegalArgumentException("open() failed.", sqe); + } catch (ClassNotFoundException cnfe) { + throw new IllegalArgumentException("JDBC driver class not found.", cnfe); + } + } + + /** + * This is a lookup method which is called by Flink framework in runtime. + * @param keys lookup keys + */ + public void eval(Object... keys) { + RowData keyRow = GenericRowData.of(keys); + if (cache != null) { + List<RowData> cachedRows = cache.getIfPresent(keyRow); + if (cachedRows != null) { + for (RowData cachedRow : cachedRows) { + collect(cachedRow); + } + return; + } + } + + for (int retry = 1; retry <= maxRetryTimes; retry++) { + try { + statement.clearParameters(); + for (int i = 0; i < keys.length; i++) { + JdbcUtils.setField(statement, keySqlTypes[i], keys[i], i); Review comment: This is wrong. You should use `JdbcRowConverter` to set the statements, otherwise, if the lookup key is a string type or timestamp type, an exception will be thrown. You can also add a test for this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
