twalthr commented on a change in pull request #16334: URL: https://github.com/apache/flink/pull/16334#discussion_r666055781
########## File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptionsUtil.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.hbase.options; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; +import org.apache.flink.connector.hbase.util.HBaseTableSchema; +import org.apache.flink.table.api.TableSchema; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; + +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_MAX_ROWS; +import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_PARALLELISM; +import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_QUORUM; +import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_ZNODE_PARENT; + +/** Utilities for {@link HBaseOptions}. */ +@Internal +public class HBaseOptionsUtil { Review comment: is default scope possible for these utilities that should always be next to the factory in the same package? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOptions.java ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import java.time.Duration; + +/** Options for the JDBC connector. */ +@PublicEvolving +public class JdbcOptions { + + public static final ConfigOption<String> URL = + ConfigOptions.key("url") + .stringType() + .noDefaultValue() + .withDescription("The JDBC database URL."); + + public static final ConfigOption<String> TABLE_NAME = + ConfigOptions.key("table-name") + .stringType() + .noDefaultValue() + .withDescription("The JDBC table name."); + + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription("The JDBC user name."); + + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription("The JDBC password."); + + public static final ConfigOption<String> DRIVER = + ConfigOptions.key("driver") + .stringType() + .noDefaultValue() + .withDescription( + "The class name of the JDBC driver to use to connect to this URL. " + + "If not set, it will automatically be derived from the URL."); + + public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT = + ConfigOptions.key("connection.max-retry-timeout") + .durationType() + .defaultValue(Duration.ofSeconds(60)) + .withDescription("Maximum timeout between retries."); + + public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; + + // ----------------------------------------------------------------------------------------- + // Read options Review comment: `Scan` options ########## File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptionsUtil.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.hbase.options; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; +import org.apache.flink.connector.hbase.util.HBaseTableSchema; +import org.apache.flink.table.api.TableSchema; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; + +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_MAX_ROWS; +import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_PARALLELISM; +import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_QUORUM; +import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_ZNODE_PARENT; + +/** Utilities for {@link HBaseOptions}. */ +@Internal +public class HBaseOptionsUtil { + + /** Prefix for HBase specific properties. */ + public static final String PROPERTIES_PREFIX = "properties."; + + // -------------------------------------------------------------------------------------------- + // Validation + // -------------------------------------------------------------------------------------------- + + /** + * Checks that the HBase table have row key defined. A row key is defined as an atomic type, and + * column families and qualifiers are defined as ROW type. There shouldn't be multiple atomic + * type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the primary key + * constraint must be defined on the single row key field. + */ + public static void validatePrimaryKey(TableSchema schema) { + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema); + if (!hbaseSchema.getRowKeyName().isPresent()) { + throw new IllegalArgumentException( + "HBase table requires to define a row key field. " + + "A row key field is defined as an atomic type, " + + "column families and qualifiers are defined as ROW type."); + } + schema.getPrimaryKey() + .ifPresent( + k -> { + if (k.getColumns().size() > 1) { + throw new IllegalArgumentException( + "HBase table doesn't support a primary Key on multiple columns. " + + "The primary key of HBase table must be defined on row key field."); + } + if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) { + throw new IllegalArgumentException( + "Primary key of HBase table must be defined on the row key field. " + + "A row key field is defined as an atomic type, " + + "column families and qualifiers are defined as ROW type."); + } + }); + } + + public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig tableOptions) { + HBaseWriteOptions.Builder builder = HBaseWriteOptions.builder(); + builder.setBufferFlushIntervalMillis( + tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); + builder.setBufferFlushMaxRows(tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS)); + builder.setBufferFlushMaxSizeInBytes( + tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes()); + builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null)); + return builder.build(); + } + + public static HBaseLookupOptions getHBaseLookupOptions(ReadableConfig tableOptions) { + HBaseLookupOptions.Builder builder = HBaseLookupOptions.builder(); + builder.setLookupAsync(tableOptions.get(HBaseOptions.LOOKUP_ASYNC)); + builder.setMaxRetryTimes(tableOptions.get(HBaseOptions.LOOKUP_MAX_RETRIES)); + builder.setCacheExpireMs(tableOptions.get(HBaseOptions.LOOKUP_CACHE_TTL).toMillis()); + builder.setCacheMaxSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS)); + return builder.build(); + } + + /** + * config HBase Configuration. + * + * @param options properties option + */ + public static Configuration getHBaseConfiguration(Map<String, String> options) { + org.apache.flink.configuration.Configuration tableOptions = + org.apache.flink.configuration.Configuration.fromMap(options); + // create default configuration from current runtime env (`hbase-site.xml` in classpath) + // first, + Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration(); + hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, tableOptions.getString(ZOOKEEPER_QUORUM)); + hbaseClientConf.set( + HConstants.ZOOKEEPER_ZNODE_PARENT, tableOptions.getString(ZOOKEEPER_ZNODE_PARENT)); + // add HBase properties + final Properties properties = getHBaseClientProperties(options); + properties.forEach((k, v) -> hbaseClientConf.set(k.toString(), v.toString())); + return hbaseClientConf; + } + + private static Properties getHBaseClientProperties(Map<String, String> tableOptions) { Review comment: could we add a `[hotfix]` commit to replace this prefix magic? We can replace this with a map option type nowadays. Also for the Kafka connector and maybe also for the Kinesis connector. Otherwise the new Option classes are not very useful if they are less complete. ########## File path: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java ########## @@ -18,12 +18,14 @@ package org.apache.flink.formats.avro.registry.confluent; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import java.util.Map; /** Options for Schema Registry Avro format. */ +@PublicEvolving public class RegistryAvroOptions { private RegistryAvroOptions() {} Review comment: nit: put the constructor to the end as in almost all other classes ########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/formats/raw/RawOptions.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats.raw; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.nio.charset.StandardCharsets; + +/** Options for the "raw" format. */ +@PublicEvolving +public class RawOptions { + + public static final String BIG_ENDIAN = "big-endian"; Review comment: we should improve our `ConfigOption` design to allow enums with special characters, it doesn't help if we support enum type but nobody wants to use them and they use string type again ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java ########## @@ -280,567 +253,10 @@ private KafkaOptions() {} + "must be set to be greater than zero to enable sink buffer flushing.") .build()); - private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT = - ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue(); - - // -------------------------------------------------------------------------------------------- - // Option enumerations - // -------------------------------------------------------------------------------------------- - - // Start up offset. - public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; - public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; - public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets"; - public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets"; - public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp"; - - private static final Set<String> SCAN_STARTUP_MODE_ENUMS = - new HashSet<>( - Arrays.asList( - SCAN_STARTUP_MODE_VALUE_EARLIEST, - SCAN_STARTUP_MODE_VALUE_LATEST, - SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS, - SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS, - SCAN_STARTUP_MODE_VALUE_TIMESTAMP)); - - // Sink partitioner. - public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default"; - public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed"; - public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin"; - - // Sink semantic - public static final String SINK_SEMANTIC_VALUE_EXACTLY_ONCE = "exactly-once"; - public static final String SINK_SEMANTIC_VALUE_AT_LEAST_ONCE = "at-least-once"; - public static final String SINK_SEMANTIC_VALUE_NONE = "none"; - - private static final Set<String> SINK_SEMANTIC_ENUMS = - new HashSet<>( - Arrays.asList( - SINK_SEMANTIC_VALUE_AT_LEAST_ONCE, - SINK_SEMANTIC_VALUE_EXACTLY_ONCE, - SINK_SEMANTIC_VALUE_NONE)); - - // Prefix for Kafka specific properties. - public static final String PROPERTIES_PREFIX = "properties."; - - // Other keywords. - private static final String PARTITION = "partition"; - private static final String OFFSET = "offset"; - protected static final String AVRO_CONFLUENT = "avro-confluent"; - protected static final String DEBEZIUM_AVRO_CONFLUENT = "debezium-avro-confluent"; - private static final List<String> SCHEMA_REGISTRY_FORMATS = - Arrays.asList(AVRO_CONFLUENT, DEBEZIUM_AVRO_CONFLUENT); - - // -------------------------------------------------------------------------------------------- - // Validation - // -------------------------------------------------------------------------------------------- - - public static void validateTableSourceOptions(ReadableConfig tableOptions) { - validateSourceTopic(tableOptions); - validateScanStartupMode(tableOptions); - } - - public static void validateTableSinkOptions(ReadableConfig tableOptions) { - validateSinkTopic(tableOptions); - validateSinkPartitioner(tableOptions); - validateSinkSemantic(tableOptions); - } - - public static void validateSourceTopic(ReadableConfig tableOptions) { - Optional<List<String>> topic = tableOptions.getOptional(TOPIC); - Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN); - - if (topic.isPresent() && pattern.isPresent()) { - throw new ValidationException( - "Option 'topic' and 'topic-pattern' shouldn't be set together."); - } - - if (!topic.isPresent() && !pattern.isPresent()) { - throw new ValidationException("Either 'topic' or 'topic-pattern' must be set."); - } - } - - public static void validateSinkTopic(ReadableConfig tableOptions) { - String errorMessageTemp = - "Flink Kafka sink currently only supports single topic, but got %s: %s."; - if (!isSingleTopic(tableOptions)) { - if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) { - throw new ValidationException( - String.format( - errorMessageTemp, - "'topic-pattern'", - tableOptions.get(TOPIC_PATTERN))); - } else { - throw new ValidationException( - String.format(errorMessageTemp, "'topic'", tableOptions.get(TOPIC))); - } - } - } - - private static void validateScanStartupMode(ReadableConfig tableOptions) { - tableOptions - .getOptional(SCAN_STARTUP_MODE) - .map(String::toLowerCase) - .ifPresent( - mode -> { - if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) { - throw new ValidationException( - String.format( - "Invalid value for option '%s'. Supported values are %s, but was: %s", - SCAN_STARTUP_MODE.key(), - "[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]", - mode)); - } - - if (mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP)) { - if (!tableOptions - .getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS) - .isPresent()) { - throw new ValidationException( - String.format( - "'%s' is required in '%s' startup mode" - + " but missing.", - SCAN_STARTUP_TIMESTAMP_MILLIS.key(), - SCAN_STARTUP_MODE_VALUE_TIMESTAMP)); - } - } - if (mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) { - if (!tableOptions - .getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS) - .isPresent()) { - throw new ValidationException( - String.format( - "'%s' is required in '%s' startup mode" - + " but missing.", - SCAN_STARTUP_SPECIFIC_OFFSETS.key(), - SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)); - } - if (!isSingleTopic(tableOptions)) { - throw new ValidationException( - "Currently Kafka source only supports specific offset for single topic."); - } - String specificOffsets = - tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS); - parseSpecificOffsets( - specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key()); - } - }); - } - - private static void validateSinkPartitioner(ReadableConfig tableOptions) { - tableOptions - .getOptional(SINK_PARTITIONER) - .ifPresent( - partitioner -> { - if (partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) - && tableOptions.getOptional(KEY_FIELDS).isPresent()) { - throw new ValidationException( - "Currently 'round-robin' partitioner only works when option 'key.fields' is not specified."); - } else if (partitioner.isEmpty()) { - throw new ValidationException( - String.format( - "Option '%s' should be a non-empty string.", - SINK_PARTITIONER.key())); - } - }); - } - - private static void validateSinkSemantic(ReadableConfig tableOptions) { - tableOptions - .getOptional(SINK_SEMANTIC) - .ifPresent( - semantic -> { - if (!SINK_SEMANTIC_ENUMS.contains(semantic)) { - throw new ValidationException( - String.format( - "Unsupported value '%s' for '%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].", - semantic, SINK_SEMANTIC.key())); - } - }); - } - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - public static KafkaSinkSemantic getSinkSemantic(ReadableConfig tableOptions) { - switch (tableOptions.get(SINK_SEMANTIC)) { - case SINK_SEMANTIC_VALUE_EXACTLY_ONCE: - return EXACTLY_ONCE; - case SINK_SEMANTIC_VALUE_AT_LEAST_ONCE: - return AT_LEAST_ONCE; - case SINK_SEMANTIC_VALUE_NONE: - return NONE; - default: - throw new TableException("Validator should have checked that"); - } - } - - public static List<String> getSourceTopics(ReadableConfig tableOptions) { - return tableOptions.getOptional(TOPIC).orElse(null); - } - - public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) { - return tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null); - } - - private static boolean isSingleTopic(ReadableConfig tableOptions) { - // Option 'topic-pattern' is regarded as multi-topics. - return tableOptions.getOptional(TOPIC).map(t -> t.size() == 1).orElse(false); - } - - public static StartupOptions getStartupOptions(ReadableConfig tableOptions) { - final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); - final StartupMode startupMode = - tableOptions - .getOptional(SCAN_STARTUP_MODE) - .map( - modeString -> { - switch (modeString) { - case SCAN_STARTUP_MODE_VALUE_EARLIEST: - return StartupMode.EARLIEST; - - case SCAN_STARTUP_MODE_VALUE_LATEST: - return StartupMode.LATEST; - - case SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS: - return StartupMode.GROUP_OFFSETS; - - case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS: - // It will be refactored after support specific offset - // for multiple topics in FLINK-18602. - // We have already checked tableOptions.get(TOPIC) - // contains one topic in validateScanStartupMode(). - buildSpecificOffsets( - tableOptions, - tableOptions.get(TOPIC).get(0), - specificOffsets); - return StartupMode.SPECIFIC_OFFSETS; - - case SCAN_STARTUP_MODE_VALUE_TIMESTAMP: - return StartupMode.TIMESTAMP; - - default: - throw new TableException( - "Unsupported startup mode. Validator should have checked that."); - } - }) - .orElse(StartupMode.GROUP_OFFSETS); - final StartupOptions options = new StartupOptions(); - options.startupMode = startupMode; - options.specificOffsets = specificOffsets; - if (startupMode == StartupMode.TIMESTAMP) { - options.startupTimestampMillis = tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS); - } - return options; - } - - private static void buildSpecificOffsets( - ReadableConfig tableOptions, - String topic, - Map<KafkaTopicPartition, Long> specificOffsets) { - String specificOffsetsStrOpt = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS); - final Map<Integer, Long> offsetMap = - parseSpecificOffsets(specificOffsetsStrOpt, SCAN_STARTUP_SPECIFIC_OFFSETS.key()); - offsetMap.forEach( - (partition, offset) -> { - final KafkaTopicPartition topicPartition = - new KafkaTopicPartition(topic, partition); - specificOffsets.put(topicPartition, offset); - }); - } - - public static Properties getKafkaProperties(Map<String, String> tableOptions) { - final Properties kafkaProperties = new Properties(); - - if (hasKafkaClientProperties(tableOptions)) { - tableOptions.keySet().stream() - .filter(key -> key.startsWith(PROPERTIES_PREFIX)) - .forEach( - key -> { - final String value = tableOptions.get(key); - final String subKey = key.substring((PROPERTIES_PREFIX).length()); - kafkaProperties.put(subKey, value); - }); - } - return kafkaProperties; - } - - /** - * The partitioner can be either "fixed", "round-robin" or a customized partitioner full class - * name. - */ - public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner( - ReadableConfig tableOptions, ClassLoader classLoader) { - return tableOptions - .getOptional(SINK_PARTITIONER) - .flatMap( - (String partitioner) -> { - switch (partitioner) { - case SINK_PARTITIONER_VALUE_FIXED: - return Optional.of(new FlinkFixedPartitioner<>()); - case SINK_PARTITIONER_VALUE_DEFAULT: - case SINK_PARTITIONER_VALUE_ROUND_ROBIN: - return Optional.empty(); - // Default fallback to full class name of the partitioner. - default: - return Optional.of( - initializePartitioner(partitioner, classLoader)); - } - }); - } - - /** - * Parses SpecificOffsets String to Map. - * - * <p>SpecificOffsets String format was given as following: - * - * <pre> - * scan.startup.specific-offsets = partition:0,offset:42;partition:1,offset:300 - * </pre> - * - * @return SpecificOffsets with Map format, key is partition, and value is offset - */ - public static Map<Integer, Long> parseSpecificOffsets( - String specificOffsetsStr, String optionKey) { - final Map<Integer, Long> offsetMap = new HashMap<>(); - final String[] pairs = specificOffsetsStr.split(";"); - final String validationExceptionMessage = - String.format( - "Invalid properties '%s' should follow the format " - + "'partition:0,offset:42;partition:1,offset:300', but is '%s'.", - optionKey, specificOffsetsStr); - - if (pairs.length == 0) { - throw new ValidationException(validationExceptionMessage); - } - - for (String pair : pairs) { - if (null == pair || pair.length() == 0 || !pair.contains(",")) { - throw new ValidationException(validationExceptionMessage); - } - - final String[] kv = pair.split(","); - if (kv.length != 2 - || !kv[0].startsWith(PARTITION + ':') - || !kv[1].startsWith(OFFSET + ':')) { - throw new ValidationException(validationExceptionMessage); - } - - String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1); - String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1); - try { - final Integer partition = Integer.valueOf(partitionValue); - final Long offset = Long.valueOf(offsetValue); - offsetMap.put(partition, offset); - } catch (NumberFormatException e) { - throw new ValidationException(validationExceptionMessage, e); - } - } - return offsetMap; - } - - /** - * Decides if the table options contains Kafka client properties that start with prefix - * 'properties'. - */ - private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) { - return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); - } - - /** Returns a class value with the given class name. */ - private static <T> FlinkKafkaPartitioner<T> initializePartitioner( - String name, ClassLoader classLoader) { - try { - Class<?> clazz = Class.forName(name, true, classLoader); - if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) { - throw new ValidationException( - String.format( - "Sink partitioner class '%s' should extend from the required class %s", - name, FlinkKafkaPartitioner.class.getName())); - } - @SuppressWarnings("unchecked") - final FlinkKafkaPartitioner<T> kafkaPartitioner = - InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader); - - return kafkaPartitioner; - } catch (ClassNotFoundException | FlinkException e) { - throw new ValidationException( - String.format("Could not find and instantiate partitioner class '%s'", name), - e); - } - } - - /** - * Creates an array of indices that determine which physical fields of the table schema to - * include in the key format and the order that those fields have in the key format. - * - * <p>See {@link #KEY_FORMAT}, {@link #KEY_FIELDS}, and {@link #KEY_FIELDS_PREFIX} for more - * information. - */ - public static int[] createKeyFormatProjection( - ReadableConfig options, DataType physicalDataType) { - final LogicalType physicalType = physicalDataType.getLogicalType(); - Preconditions.checkArgument( - hasRoot(physicalType, LogicalTypeRoot.ROW), "Row data type expected."); - final Optional<String> optionalKeyFormat = options.getOptional(KEY_FORMAT); - final Optional<List<String>> optionalKeyFields = options.getOptional(KEY_FIELDS); - - if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) { - throw new ValidationException( - String.format( - "The option '%s' can only be declared if a key format is defined using '%s'.", - KEY_FIELDS.key(), KEY_FORMAT.key())); - } else if (optionalKeyFormat.isPresent() - && (!optionalKeyFields.isPresent() || optionalKeyFields.get().size() == 0)) { - throw new ValidationException( - String.format( - "A key format '%s' requires the declaration of one or more of key fields using '%s'.", - KEY_FORMAT.key(), KEY_FIELDS.key())); - } - - if (!optionalKeyFormat.isPresent()) { - return new int[0]; - } - - final String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse(""); - - final List<String> keyFields = optionalKeyFields.get(); - final List<String> physicalFields = LogicalTypeChecks.getFieldNames(physicalType); - return keyFields.stream() - .mapToInt( - keyField -> { - final int pos = physicalFields.indexOf(keyField); - // check that field name exists - if (pos < 0) { - throw new ValidationException( - String.format( - "Could not find the field '%s' in the table schema for usage in the key format. " - + "A key field must be a regular, physical column. " - + "The following columns can be selected in the '%s' option:\n" - + "%s", - keyField, KEY_FIELDS.key(), physicalFields)); - } - // check that field name is prefixed correctly - if (!keyField.startsWith(keyPrefix)) { - throw new ValidationException( - String.format( - "All fields in '%s' must be prefixed with '%s' when option '%s' " - + "is set but field '%s' is not prefixed.", - KEY_FIELDS.key(), - keyPrefix, - KEY_FIELDS_PREFIX.key(), - keyField)); - } - return pos; - }) - .toArray(); - } - - /** - * Creates an array of indices that determine which physical fields of the table schema to - * include in the value format. - * - * <p>See {@link #VALUE_FORMAT}, {@link #VALUE_FIELDS_INCLUDE}, and {@link #KEY_FIELDS_PREFIX} - * for more information. - */ - public static int[] createValueFormatProjection( - ReadableConfig options, DataType physicalDataType) { - final LogicalType physicalType = physicalDataType.getLogicalType(); - Preconditions.checkArgument( - hasRoot(physicalType, LogicalTypeRoot.ROW), "Row data type expected."); - final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType); - final IntStream physicalFields = IntStream.range(0, physicalFieldCount); - - final String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse(""); - - final ValueFieldsStrategy strategy = options.get(VALUE_FIELDS_INCLUDE); - if (strategy == ValueFieldsStrategy.ALL) { - if (keyPrefix.length() > 0) { - throw new ValidationException( - String.format( - "A key prefix is not allowed when option '%s' is set to '%s'. " - + "Set it to '%s' instead to avoid field overlaps.", - VALUE_FIELDS_INCLUDE.key(), - ValueFieldsStrategy.ALL, - ValueFieldsStrategy.EXCEPT_KEY)); - } - return physicalFields.toArray(); - } else if (strategy == ValueFieldsStrategy.EXCEPT_KEY) { - final int[] keyProjection = createKeyFormatProjection(options, physicalDataType); - return physicalFields - .filter(pos -> IntStream.of(keyProjection).noneMatch(k -> k == pos)) - .toArray(); - } - throw new TableException("Unknown value fields strategy:" + strategy); - } - - /** - * Returns a new table context with a default schema registry subject value in the options if - * the format is a schema registry format (e.g. 'avro-confluent') and the subject is not - * defined. - */ - public static DynamicTableFactory.Context autoCompleteSchemaRegistrySubject( - DynamicTableFactory.Context context) { - Map<String, String> tableOptions = context.getCatalogTable().getOptions(); - Map<String, String> newOptions = autoCompleteSchemaRegistrySubject(tableOptions); - if (newOptions.size() > tableOptions.size()) { - // build a new context - return new FactoryUtil.DefaultDynamicTableContext( - context.getObjectIdentifier(), - context.getCatalogTable().copy(newOptions), - context.getConfiguration(), - context.getClassLoader(), - context.isTemporary()); - } else { - return context; - } - } - - private static Map<String, String> autoCompleteSchemaRegistrySubject( - Map<String, String> options) { - Configuration configuration = Configuration.fromMap(options); - // the subject autoComplete should only be used in sink, check the topic first - validateSinkTopic(configuration); - final Optional<String> valueFormat = configuration.getOptional(VALUE_FORMAT); - final Optional<String> keyFormat = configuration.getOptional(KEY_FORMAT); - final Optional<String> format = configuration.getOptional(FORMAT); - final String topic = configuration.get(TOPIC).get(0); - - if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) { - autoCompleteSubject(configuration, format.get(), topic + "-value"); - } else if (valueFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) { - autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value"); - } - - if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) { - autoCompleteSubject(configuration, "key." + keyFormat.get(), topic + "-key"); - } - return configuration.toMap(); - } - - private static void autoCompleteSubject( - Configuration configuration, String format, String subject) { - ConfigOption<String> subjectOption = - ConfigOptions.key(format + "." + SCHEMA_REGISTRY_SUBJECT.key()) - .stringType() - .noDefaultValue(); - if (!configuration.getOptional(subjectOption).isPresent()) { - configuration.setString(subjectOption, subject); - } - } - // -------------------------------------------------------------------------------------------- - // Inner classes + // Enums Review comment: Here you did my suggestion from above :D ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOptions.java ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import java.time.Duration; + +/** Options for the JDBC connector. */ +@PublicEvolving +public class JdbcOptions { Review comment: Shall we call those classes `JbcConnectorOptions`, `KafkaConnectorOptions`, ...? To avoid clashes with catalog options in the future. ########## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptionsUtil.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; + +/** Utilities for {@link DataGenOptions}. */ +@Internal +public class DataGenOptionsUtil { Review comment: Actually, the task of the `Factory` is to translate options. We could also get rid of most of util classes and move the code to the factory itself. ########## File path: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java ########## @@ -18,12 +18,14 @@ package org.apache.flink.formats.avro.registry.confluent; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import java.util.Map; /** Options for Schema Registry Avro format. */ +@PublicEvolving public class RegistryAvroOptions { Review comment: shall we always use the factory identifier? in this case `AvroConfluentOptions`? ########## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java ########## @@ -29,9 +30,8 @@ import static org.apache.flink.configuration.description.TextElement.text; -/** - * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch. - */ +/** Options for the Elasticsearch connector. */ +@PublicEvolving public class ElasticsearchOptions { /** Review comment: nit: how about we move helper classes all the way down and split each option definition by an empty line for better readability. ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptionUtil.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD; +import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS; +import static org.apache.flink.formats.json.JsonOptions.MAP_NULL_KEY_MODE; +import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT; + +/** Utilities for {@link JsonOptions}. */ +@Internal +public class JsonOptionUtil { + + // -------------------------------------------------------------------------------------------- + // Option enumerations + // -------------------------------------------------------------------------------------------- + + public static final String SQL = "SQL"; + public static final String ISO_8601 = "ISO-8601"; + + public static final Set<String> TIMESTAMP_FORMAT_ENUM = + new HashSet<>(Arrays.asList(SQL, ISO_8601)); + + // The handling mode of null key for map data + public static final String JSON_MAP_NULL_KEY_MODE_FAIL = "FAIL"; + public static final String JSON_MAP_NULL_KEY_MODE_DROP = "DROP"; + public static final String JSON_MAP_NULL_KEY_MODE_LITERAL = "LITERAL"; + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + public static TimestampFormat getTimestampFormat(ReadableConfig config) { Review comment: side comment: why is this not an enum? :( ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java ########## @@ -81,63 +73,6 @@ .withDescription( "Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default."); - // -------------------------------------------------------------------------------------------- - // Option enumerations - // -------------------------------------------------------------------------------------------- - - public static final String SQL = "SQL"; - public static final String ISO_8601 = "ISO-8601"; - - public static final Set<String> TIMESTAMP_FORMAT_ENUM = - new HashSet<>(Arrays.asList(SQL, ISO_8601)); - - // The handling mode of null key for map data - public static final String JSON_MAP_NULL_KEY_MODE_FAIL = "FAIL"; - public static final String JSON_MAP_NULL_KEY_MODE_DROP = "DROP"; - public static final String JSON_MAP_NULL_KEY_MODE_LITERAL = "LITERAL"; - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - public static TimestampFormat getTimestampFormat(ReadableConfig config) { - String timestampFormat = config.get(TIMESTAMP_FORMAT); - switch (timestampFormat) { - case SQL: - return TimestampFormat.SQL; - case ISO_8601: - return TimestampFormat.ISO_8601; - default: - throw new TableException( - String.format( - "Unsupported timestamp format '%s'. Validator should have checked that.", - timestampFormat)); - } - } - - /** - * Creates handling mode for null key map data. - * - * <p>See {@link #JSON_MAP_NULL_KEY_MODE_FAIL}, {@link #JSON_MAP_NULL_KEY_MODE_DROP}, and {@link - * #JSON_MAP_NULL_KEY_MODE_LITERAL} for more information. - */ - public static MapNullKeyMode getMapNullKeyMode(ReadableConfig config) { - String mapNullKeyMode = config.get(MAP_NULL_KEY_MODE); - switch (mapNullKeyMode.toUpperCase()) { - case JSON_MAP_NULL_KEY_MODE_FAIL: - return MapNullKeyMode.FAIL; - case JSON_MAP_NULL_KEY_MODE_DROP: - return MapNullKeyMode.DROP; - case JSON_MAP_NULL_KEY_MODE_LITERAL: - return MapNullKeyMode.LITERAL; - default: - throw new TableException( - String.format( - "Unsupported map null key handling mode '%s'. Validator should have checked that.", - mapNullKeyMode)); - } - } - // -------------------------------------------------------------------------------------------- // Inner classes Review comment: nit: `Enums` ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptionUtil.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD; +import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS; +import static org.apache.flink.formats.json.JsonOptions.MAP_NULL_KEY_MODE; +import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT; + +/** Utilities for {@link JsonOptions}. */ +@Internal +public class JsonOptionUtil { Review comment: In Hbase we called it `HBaseOptionsUtil` with `s`. We should strive for consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
