twalthr commented on a change in pull request #16334:
URL: https://github.com/apache/flink/pull/16334#discussion_r666055781



##########
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptionsUtil.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.options;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.api.TableSchema;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_MAX_ROWS;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.SINK_PARALLELISM;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_QUORUM;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_ZNODE_PARENT;
+
+/** Utilities for {@link HBaseOptions}. */
+@Internal
+public class HBaseOptionsUtil {

Review comment:
       is default scope possible for these utilities that should always be next 
to the factory in the same package?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOptions.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.Duration;
+
+/** Options for the JDBC connector. */
+@PublicEvolving
+public class JdbcOptions {
+
+    public static final ConfigOption<String> URL =
+            ConfigOptions.key("url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC database URL.");
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC table name.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC user name.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The JDBC password.");
+
+    public static final ConfigOption<String> DRIVER =
+            ConfigOptions.key("driver")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The class name of the JDBC driver to use to 
connect to this URL. "
+                                    + "If not set, it will automatically be 
derived from the URL.");
+
+    public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT =
+            ConfigOptions.key("connection.max-retry-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(60))
+                    .withDescription("Maximum timeout between retries.");
+
+    public static final ConfigOption<Integer> SINK_PARALLELISM = 
FactoryUtil.SINK_PARALLELISM;
+
+    // 
-----------------------------------------------------------------------------------------
+    // Read options

Review comment:
       `Scan` options

##########
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptionsUtil.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.options;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.api.TableSchema;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_MAX_ROWS;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.SINK_PARALLELISM;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_QUORUM;
+import static 
org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_ZNODE_PARENT;
+
+/** Utilities for {@link HBaseOptions}. */
+@Internal
+public class HBaseOptionsUtil {
+
+    /** Prefix for HBase specific properties. */
+    public static final String PROPERTIES_PREFIX = "properties.";
+
+    // 
--------------------------------------------------------------------------------------------
+    // Validation
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Checks that the HBase table have row key defined. A row key is defined 
as an atomic type, and
+     * column families and qualifiers are defined as ROW type. There shouldn't 
be multiple atomic
+     * type columns in the schema. The PRIMARY KEY constraint is optional, if 
exist, the primary key
+     * constraint must be defined on the single row key field.
+     */
+    public static void validatePrimaryKey(TableSchema schema) {
+        HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(schema);
+        if (!hbaseSchema.getRowKeyName().isPresent()) {
+            throw new IllegalArgumentException(
+                    "HBase table requires to define a row key field. "
+                            + "A row key field is defined as an atomic type, "
+                            + "column families and qualifiers are defined as 
ROW type.");
+        }
+        schema.getPrimaryKey()
+                .ifPresent(
+                        k -> {
+                            if (k.getColumns().size() > 1) {
+                                throw new IllegalArgumentException(
+                                        "HBase table doesn't support a primary 
Key on multiple columns. "
+                                                + "The primary key of HBase 
table must be defined on row key field.");
+                            }
+                            if 
(!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
+                                throw new IllegalArgumentException(
+                                        "Primary key of HBase table must be 
defined on the row key field. "
+                                                + "A row key field is defined 
as an atomic type, "
+                                                + "column families and 
qualifiers are defined as ROW type.");
+                            }
+                        });
+    }
+
+    public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig 
tableOptions) {
+        HBaseWriteOptions.Builder builder = HBaseWriteOptions.builder();
+        builder.setBufferFlushIntervalMillis(
+                tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+        
builder.setBufferFlushMaxRows(tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+        builder.setBufferFlushMaxSizeInBytes(
+                tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
+        
builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null));
+        return builder.build();
+    }
+
+    public static HBaseLookupOptions getHBaseLookupOptions(ReadableConfig 
tableOptions) {
+        HBaseLookupOptions.Builder builder = HBaseLookupOptions.builder();
+        builder.setLookupAsync(tableOptions.get(HBaseOptions.LOOKUP_ASYNC));
+        
builder.setMaxRetryTimes(tableOptions.get(HBaseOptions.LOOKUP_MAX_RETRIES));
+        
builder.setCacheExpireMs(tableOptions.get(HBaseOptions.LOOKUP_CACHE_TTL).toMillis());
+        builder.setCacheMaxSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS));
+        return builder.build();
+    }
+
+    /**
+     * config HBase Configuration.
+     *
+     * @param options properties option
+     */
+    public static Configuration getHBaseConfiguration(Map<String, String> 
options) {
+        org.apache.flink.configuration.Configuration tableOptions =
+                org.apache.flink.configuration.Configuration.fromMap(options);
+        // create default configuration from current runtime env 
(`hbase-site.xml` in classpath)
+        // first,
+        Configuration hbaseClientConf = 
HBaseConfigurationUtil.getHBaseConfiguration();
+        hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, 
tableOptions.getString(ZOOKEEPER_QUORUM));
+        hbaseClientConf.set(
+                HConstants.ZOOKEEPER_ZNODE_PARENT, 
tableOptions.getString(ZOOKEEPER_ZNODE_PARENT));
+        // add HBase properties
+        final Properties properties = getHBaseClientProperties(options);
+        properties.forEach((k, v) -> hbaseClientConf.set(k.toString(), 
v.toString()));
+        return hbaseClientConf;
+    }
+
+    private static Properties getHBaseClientProperties(Map<String, String> 
tableOptions) {

Review comment:
       could we add a `[hotfix]` commit to replace this prefix magic? We can 
replace this with a map option type nowadays. Also for the Kafka connector and 
maybe also for the Kinesis connector. Otherwise the new Option classes are not 
very useful if they are less complete.

##########
File path: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java
##########
@@ -18,12 +18,14 @@
 
 package org.apache.flink.formats.avro.registry.confluent;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
 import java.util.Map;
 
 /** Options for Schema Registry Avro format. */
+@PublicEvolving
 public class RegistryAvroOptions {
     private RegistryAvroOptions() {}

Review comment:
       nit: put the constructor to the end as in almost all other classes

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/formats/raw/RawOptions.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.raw;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.nio.charset.StandardCharsets;
+
+/** Options for the "raw" format. */
+@PublicEvolving
+public class RawOptions {
+
+    public static final String BIG_ENDIAN = "big-endian";

Review comment:
       we should improve our `ConfigOption` design to allow enums with special 
characters, it doesn't help if we support enum type but nobody wants to use 
them and they use string type again

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -280,567 +253,10 @@ private KafkaOptions() {}
                                                     + "must be set to be 
greater than zero to enable sink buffer flushing.")
                                     .build());
 
-    private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT =
-            
ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue();
-
-    // 
--------------------------------------------------------------------------------------------
-    // Option enumerations
-    // 
--------------------------------------------------------------------------------------------
-
-    // Start up offset.
-    public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
-    public static final String SCAN_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
-    public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = 
"group-offsets";
-    public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = 
"specific-offsets";
-    public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
-
-    private static final Set<String> SCAN_STARTUP_MODE_ENUMS =
-            new HashSet<>(
-                    Arrays.asList(
-                            SCAN_STARTUP_MODE_VALUE_EARLIEST,
-                            SCAN_STARTUP_MODE_VALUE_LATEST,
-                            SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS,
-                            SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
-                            SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
-
-    // Sink partitioner.
-    public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
-    public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
-    public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = 
"round-robin";
-
-    // Sink semantic
-    public static final String SINK_SEMANTIC_VALUE_EXACTLY_ONCE = 
"exactly-once";
-    public static final String SINK_SEMANTIC_VALUE_AT_LEAST_ONCE = 
"at-least-once";
-    public static final String SINK_SEMANTIC_VALUE_NONE = "none";
-
-    private static final Set<String> SINK_SEMANTIC_ENUMS =
-            new HashSet<>(
-                    Arrays.asList(
-                            SINK_SEMANTIC_VALUE_AT_LEAST_ONCE,
-                            SINK_SEMANTIC_VALUE_EXACTLY_ONCE,
-                            SINK_SEMANTIC_VALUE_NONE));
-
-    // Prefix for Kafka specific properties.
-    public static final String PROPERTIES_PREFIX = "properties.";
-
-    // Other keywords.
-    private static final String PARTITION = "partition";
-    private static final String OFFSET = "offset";
-    protected static final String AVRO_CONFLUENT = "avro-confluent";
-    protected static final String DEBEZIUM_AVRO_CONFLUENT = 
"debezium-avro-confluent";
-    private static final List<String> SCHEMA_REGISTRY_FORMATS =
-            Arrays.asList(AVRO_CONFLUENT, DEBEZIUM_AVRO_CONFLUENT);
-
-    // 
--------------------------------------------------------------------------------------------
-    // Validation
-    // 
--------------------------------------------------------------------------------------------
-
-    public static void validateTableSourceOptions(ReadableConfig tableOptions) 
{
-        validateSourceTopic(tableOptions);
-        validateScanStartupMode(tableOptions);
-    }
-
-    public static void validateTableSinkOptions(ReadableConfig tableOptions) {
-        validateSinkTopic(tableOptions);
-        validateSinkPartitioner(tableOptions);
-        validateSinkSemantic(tableOptions);
-    }
-
-    public static void validateSourceTopic(ReadableConfig tableOptions) {
-        Optional<List<String>> topic = tableOptions.getOptional(TOPIC);
-        Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
-
-        if (topic.isPresent() && pattern.isPresent()) {
-            throw new ValidationException(
-                    "Option 'topic' and 'topic-pattern' shouldn't be set 
together.");
-        }
-
-        if (!topic.isPresent() && !pattern.isPresent()) {
-            throw new ValidationException("Either 'topic' or 'topic-pattern' 
must be set.");
-        }
-    }
-
-    public static void validateSinkTopic(ReadableConfig tableOptions) {
-        String errorMessageTemp =
-                "Flink Kafka sink currently only supports single topic, but 
got %s: %s.";
-        if (!isSingleTopic(tableOptions)) {
-            if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
-                throw new ValidationException(
-                        String.format(
-                                errorMessageTemp,
-                                "'topic-pattern'",
-                                tableOptions.get(TOPIC_PATTERN)));
-            } else {
-                throw new ValidationException(
-                        String.format(errorMessageTemp, "'topic'", 
tableOptions.get(TOPIC)));
-            }
-        }
-    }
-
-    private static void validateScanStartupMode(ReadableConfig tableOptions) {
-        tableOptions
-                .getOptional(SCAN_STARTUP_MODE)
-                .map(String::toLowerCase)
-                .ifPresent(
-                        mode -> {
-                            if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "Invalid value for option 
'%s'. Supported values are %s, but was: %s",
-                                                SCAN_STARTUP_MODE.key(),
-                                                "[earliest-offset, 
latest-offset, group-offsets, specific-offsets, timestamp]",
-                                                mode));
-                            }
-
-                            if 
(mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP)) {
-                                if (!tableOptions
-                                        
.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS)
-                                        .isPresent()) {
-                                    throw new ValidationException(
-                                            String.format(
-                                                    "'%s' is required in '%s' 
startup mode"
-                                                            + " but missing.",
-                                                    
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
-                                                    
SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
-                                }
-                            }
-                            if 
(mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
-                                if (!tableOptions
-                                        
.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS)
-                                        .isPresent()) {
-                                    throw new ValidationException(
-                                            String.format(
-                                                    "'%s' is required in '%s' 
startup mode"
-                                                            + " but missing.",
-                                                    
SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
-                                                    
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
-                                }
-                                if (!isSingleTopic(tableOptions)) {
-                                    throw new ValidationException(
-                                            "Currently Kafka source only 
supports specific offset for single topic.");
-                                }
-                                String specificOffsets =
-                                        
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
-                                parseSpecificOffsets(
-                                        specificOffsets, 
SCAN_STARTUP_SPECIFIC_OFFSETS.key());
-                            }
-                        });
-    }
-
-    private static void validateSinkPartitioner(ReadableConfig tableOptions) {
-        tableOptions
-                .getOptional(SINK_PARTITIONER)
-                .ifPresent(
-                        partitioner -> {
-                            if 
(partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN)
-                                    && 
tableOptions.getOptional(KEY_FIELDS).isPresent()) {
-                                throw new ValidationException(
-                                        "Currently 'round-robin' partitioner 
only works when option 'key.fields' is not specified.");
-                            } else if (partitioner.isEmpty()) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "Option '%s' should be a 
non-empty string.",
-                                                SINK_PARTITIONER.key()));
-                            }
-                        });
-    }
-
-    private static void validateSinkSemantic(ReadableConfig tableOptions) {
-        tableOptions
-                .getOptional(SINK_SEMANTIC)
-                .ifPresent(
-                        semantic -> {
-                            if (!SINK_SEMANTIC_ENUMS.contains(semantic)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "Unsupported value '%s' for 
'%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].",
-                                                semantic, 
SINK_SEMANTIC.key()));
-                            }
-                        });
-    }
-
-    // 
--------------------------------------------------------------------------------------------
-    // Utilities
-    // 
--------------------------------------------------------------------------------------------
-
-    public static KafkaSinkSemantic getSinkSemantic(ReadableConfig 
tableOptions) {
-        switch (tableOptions.get(SINK_SEMANTIC)) {
-            case SINK_SEMANTIC_VALUE_EXACTLY_ONCE:
-                return EXACTLY_ONCE;
-            case SINK_SEMANTIC_VALUE_AT_LEAST_ONCE:
-                return AT_LEAST_ONCE;
-            case SINK_SEMANTIC_VALUE_NONE:
-                return NONE;
-            default:
-                throw new TableException("Validator should have checked that");
-        }
-    }
-
-    public static List<String> getSourceTopics(ReadableConfig tableOptions) {
-        return tableOptions.getOptional(TOPIC).orElse(null);
-    }
-
-    public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) {
-        return 
tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
-    }
-
-    private static boolean isSingleTopic(ReadableConfig tableOptions) {
-        // Option 'topic-pattern' is regarded as multi-topics.
-        return tableOptions.getOptional(TOPIC).map(t -> t.size() == 
1).orElse(false);
-    }
-
-    public static StartupOptions getStartupOptions(ReadableConfig 
tableOptions) {
-        final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
-        final StartupMode startupMode =
-                tableOptions
-                        .getOptional(SCAN_STARTUP_MODE)
-                        .map(
-                                modeString -> {
-                                    switch (modeString) {
-                                        case SCAN_STARTUP_MODE_VALUE_EARLIEST:
-                                            return StartupMode.EARLIEST;
-
-                                        case SCAN_STARTUP_MODE_VALUE_LATEST:
-                                            return StartupMode.LATEST;
-
-                                        case 
SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS:
-                                            return StartupMode.GROUP_OFFSETS;
-
-                                        case 
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
-                                            // It will be refactored after 
support specific offset
-                                            // for multiple topics in 
FLINK-18602.
-                                            // We have already checked 
tableOptions.get(TOPIC)
-                                            // contains one topic in 
validateScanStartupMode().
-                                            buildSpecificOffsets(
-                                                    tableOptions,
-                                                    
tableOptions.get(TOPIC).get(0),
-                                                    specificOffsets);
-                                            return 
StartupMode.SPECIFIC_OFFSETS;
-
-                                        case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
-                                            return StartupMode.TIMESTAMP;
-
-                                        default:
-                                            throw new TableException(
-                                                    "Unsupported startup mode. 
Validator should have checked that.");
-                                    }
-                                })
-                        .orElse(StartupMode.GROUP_OFFSETS);
-        final StartupOptions options = new StartupOptions();
-        options.startupMode = startupMode;
-        options.specificOffsets = specificOffsets;
-        if (startupMode == StartupMode.TIMESTAMP) {
-            options.startupTimestampMillis = 
tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
-        }
-        return options;
-    }
-
-    private static void buildSpecificOffsets(
-            ReadableConfig tableOptions,
-            String topic,
-            Map<KafkaTopicPartition, Long> specificOffsets) {
-        String specificOffsetsStrOpt = 
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
-        final Map<Integer, Long> offsetMap =
-                parseSpecificOffsets(specificOffsetsStrOpt, 
SCAN_STARTUP_SPECIFIC_OFFSETS.key());
-        offsetMap.forEach(
-                (partition, offset) -> {
-                    final KafkaTopicPartition topicPartition =
-                            new KafkaTopicPartition(topic, partition);
-                    specificOffsets.put(topicPartition, offset);
-                });
-    }
-
-    public static Properties getKafkaProperties(Map<String, String> 
tableOptions) {
-        final Properties kafkaProperties = new Properties();
-
-        if (hasKafkaClientProperties(tableOptions)) {
-            tableOptions.keySet().stream()
-                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
-                    .forEach(
-                            key -> {
-                                final String value = tableOptions.get(key);
-                                final String subKey = 
key.substring((PROPERTIES_PREFIX).length());
-                                kafkaProperties.put(subKey, value);
-                            });
-        }
-        return kafkaProperties;
-    }
-
-    /**
-     * The partitioner can be either "fixed", "round-robin" or a customized 
partitioner full class
-     * name.
-     */
-    public static Optional<FlinkKafkaPartitioner<RowData>> 
getFlinkKafkaPartitioner(
-            ReadableConfig tableOptions, ClassLoader classLoader) {
-        return tableOptions
-                .getOptional(SINK_PARTITIONER)
-                .flatMap(
-                        (String partitioner) -> {
-                            switch (partitioner) {
-                                case SINK_PARTITIONER_VALUE_FIXED:
-                                    return Optional.of(new 
FlinkFixedPartitioner<>());
-                                case SINK_PARTITIONER_VALUE_DEFAULT:
-                                case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
-                                    return Optional.empty();
-                                    // Default fallback to full class name of 
the partitioner.
-                                default:
-                                    return Optional.of(
-                                            initializePartitioner(partitioner, 
classLoader));
-                            }
-                        });
-    }
-
-    /**
-     * Parses SpecificOffsets String to Map.
-     *
-     * <p>SpecificOffsets String format was given as following:
-     *
-     * <pre>
-     *     scan.startup.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
-     * </pre>
-     *
-     * @return SpecificOffsets with Map format, key is partition, and value is 
offset
-     */
-    public static Map<Integer, Long> parseSpecificOffsets(
-            String specificOffsetsStr, String optionKey) {
-        final Map<Integer, Long> offsetMap = new HashMap<>();
-        final String[] pairs = specificOffsetsStr.split(";");
-        final String validationExceptionMessage =
-                String.format(
-                        "Invalid properties '%s' should follow the format "
-                                + 
"'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
-                        optionKey, specificOffsetsStr);
-
-        if (pairs.length == 0) {
-            throw new ValidationException(validationExceptionMessage);
-        }
-
-        for (String pair : pairs) {
-            if (null == pair || pair.length() == 0 || !pair.contains(",")) {
-                throw new ValidationException(validationExceptionMessage);
-            }
-
-            final String[] kv = pair.split(",");
-            if (kv.length != 2
-                    || !kv[0].startsWith(PARTITION + ':')
-                    || !kv[1].startsWith(OFFSET + ':')) {
-                throw new ValidationException(validationExceptionMessage);
-            }
-
-            String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
-            String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
-            try {
-                final Integer partition = Integer.valueOf(partitionValue);
-                final Long offset = Long.valueOf(offsetValue);
-                offsetMap.put(partition, offset);
-            } catch (NumberFormatException e) {
-                throw new ValidationException(validationExceptionMessage, e);
-            }
-        }
-        return offsetMap;
-    }
-
-    /**
-     * Decides if the table options contains Kafka client properties that 
start with prefix
-     * 'properties'.
-     */
-    private static boolean hasKafkaClientProperties(Map<String, String> 
tableOptions) {
-        return tableOptions.keySet().stream().anyMatch(k -> 
k.startsWith(PROPERTIES_PREFIX));
-    }
-
-    /** Returns a class value with the given class name. */
-    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
-            String name, ClassLoader classLoader) {
-        try {
-            Class<?> clazz = Class.forName(name, true, classLoader);
-            if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
-                throw new ValidationException(
-                        String.format(
-                                "Sink partitioner class '%s' should extend 
from the required class %s",
-                                name, FlinkKafkaPartitioner.class.getName()));
-            }
-            @SuppressWarnings("unchecked")
-            final FlinkKafkaPartitioner<T> kafkaPartitioner =
-                    InstantiationUtil.instantiate(name, 
FlinkKafkaPartitioner.class, classLoader);
-
-            return kafkaPartitioner;
-        } catch (ClassNotFoundException | FlinkException e) {
-            throw new ValidationException(
-                    String.format("Could not find and instantiate partitioner 
class '%s'", name),
-                    e);
-        }
-    }
-
-    /**
-     * Creates an array of indices that determine which physical fields of the 
table schema to
-     * include in the key format and the order that those fields have in the 
key format.
-     *
-     * <p>See {@link #KEY_FORMAT}, {@link #KEY_FIELDS}, and {@link 
#KEY_FIELDS_PREFIX} for more
-     * information.
-     */
-    public static int[] createKeyFormatProjection(
-            ReadableConfig options, DataType physicalDataType) {
-        final LogicalType physicalType = physicalDataType.getLogicalType();
-        Preconditions.checkArgument(
-                hasRoot(physicalType, LogicalTypeRoot.ROW), "Row data type 
expected.");
-        final Optional<String> optionalKeyFormat = 
options.getOptional(KEY_FORMAT);
-        final Optional<List<String>> optionalKeyFields = 
options.getOptional(KEY_FIELDS);
-
-        if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
-            throw new ValidationException(
-                    String.format(
-                            "The option '%s' can only be declared if a key 
format is defined using '%s'.",
-                            KEY_FIELDS.key(), KEY_FORMAT.key()));
-        } else if (optionalKeyFormat.isPresent()
-                && (!optionalKeyFields.isPresent() || 
optionalKeyFields.get().size() == 0)) {
-            throw new ValidationException(
-                    String.format(
-                            "A key format '%s' requires the declaration of one 
or more of key fields using '%s'.",
-                            KEY_FORMAT.key(), KEY_FIELDS.key()));
-        }
-
-        if (!optionalKeyFormat.isPresent()) {
-            return new int[0];
-        }
-
-        final String keyPrefix = 
options.getOptional(KEY_FIELDS_PREFIX).orElse("");
-
-        final List<String> keyFields = optionalKeyFields.get();
-        final List<String> physicalFields = 
LogicalTypeChecks.getFieldNames(physicalType);
-        return keyFields.stream()
-                .mapToInt(
-                        keyField -> {
-                            final int pos = physicalFields.indexOf(keyField);
-                            // check that field name exists
-                            if (pos < 0) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "Could not find the field '%s' 
in the table schema for usage in the key format. "
-                                                        + "A key field must be 
a regular, physical column. "
-                                                        + "The following 
columns can be selected in the '%s' option:\n"
-                                                        + "%s",
-                                                keyField, KEY_FIELDS.key(), 
physicalFields));
-                            }
-                            // check that field name is prefixed correctly
-                            if (!keyField.startsWith(keyPrefix)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "All fields in '%s' must be 
prefixed with '%s' when option '%s' "
-                                                        + "is set but field 
'%s' is not prefixed.",
-                                                KEY_FIELDS.key(),
-                                                keyPrefix,
-                                                KEY_FIELDS_PREFIX.key(),
-                                                keyField));
-                            }
-                            return pos;
-                        })
-                .toArray();
-    }
-
-    /**
-     * Creates an array of indices that determine which physical fields of the 
table schema to
-     * include in the value format.
-     *
-     * <p>See {@link #VALUE_FORMAT}, {@link #VALUE_FIELDS_INCLUDE}, and {@link 
#KEY_FIELDS_PREFIX}
-     * for more information.
-     */
-    public static int[] createValueFormatProjection(
-            ReadableConfig options, DataType physicalDataType) {
-        final LogicalType physicalType = physicalDataType.getLogicalType();
-        Preconditions.checkArgument(
-                hasRoot(physicalType, LogicalTypeRoot.ROW), "Row data type 
expected.");
-        final int physicalFieldCount = 
LogicalTypeChecks.getFieldCount(physicalType);
-        final IntStream physicalFields = IntStream.range(0, 
physicalFieldCount);
-
-        final String keyPrefix = 
options.getOptional(KEY_FIELDS_PREFIX).orElse("");
-
-        final ValueFieldsStrategy strategy = options.get(VALUE_FIELDS_INCLUDE);
-        if (strategy == ValueFieldsStrategy.ALL) {
-            if (keyPrefix.length() > 0) {
-                throw new ValidationException(
-                        String.format(
-                                "A key prefix is not allowed when option '%s' 
is set to '%s'. "
-                                        + "Set it to '%s' instead to avoid 
field overlaps.",
-                                VALUE_FIELDS_INCLUDE.key(),
-                                ValueFieldsStrategy.ALL,
-                                ValueFieldsStrategy.EXCEPT_KEY));
-            }
-            return physicalFields.toArray();
-        } else if (strategy == ValueFieldsStrategy.EXCEPT_KEY) {
-            final int[] keyProjection = createKeyFormatProjection(options, 
physicalDataType);
-            return physicalFields
-                    .filter(pos -> IntStream.of(keyProjection).noneMatch(k -> 
k == pos))
-                    .toArray();
-        }
-        throw new TableException("Unknown value fields strategy:" + strategy);
-    }
-
-    /**
-     * Returns a new table context with a default schema registry subject 
value in the options if
-     * the format is a schema registry format (e.g. 'avro-confluent') and the 
subject is not
-     * defined.
-     */
-    public static DynamicTableFactory.Context 
autoCompleteSchemaRegistrySubject(
-            DynamicTableFactory.Context context) {
-        Map<String, String> tableOptions = 
context.getCatalogTable().getOptions();
-        Map<String, String> newOptions = 
autoCompleteSchemaRegistrySubject(tableOptions);
-        if (newOptions.size() > tableOptions.size()) {
-            // build a new context
-            return new FactoryUtil.DefaultDynamicTableContext(
-                    context.getObjectIdentifier(),
-                    context.getCatalogTable().copy(newOptions),
-                    context.getConfiguration(),
-                    context.getClassLoader(),
-                    context.isTemporary());
-        } else {
-            return context;
-        }
-    }
-
-    private static Map<String, String> autoCompleteSchemaRegistrySubject(
-            Map<String, String> options) {
-        Configuration configuration = Configuration.fromMap(options);
-        // the subject autoComplete should only be used in sink, check the 
topic first
-        validateSinkTopic(configuration);
-        final Optional<String> valueFormat = 
configuration.getOptional(VALUE_FORMAT);
-        final Optional<String> keyFormat = 
configuration.getOptional(KEY_FORMAT);
-        final Optional<String> format = configuration.getOptional(FORMAT);
-        final String topic = configuration.get(TOPIC).get(0);
-
-        if (format.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
-            autoCompleteSubject(configuration, format.get(), topic + "-value");
-        } else if (valueFormat.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
-            autoCompleteSubject(configuration, "value." + valueFormat.get(), 
topic + "-value");
-        }
-
-        if (keyFormat.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
-            autoCompleteSubject(configuration, "key." + keyFormat.get(), topic 
+ "-key");
-        }
-        return configuration.toMap();
-    }
-
-    private static void autoCompleteSubject(
-            Configuration configuration, String format, String subject) {
-        ConfigOption<String> subjectOption =
-                ConfigOptions.key(format + "." + SCHEMA_REGISTRY_SUBJECT.key())
-                        .stringType()
-                        .noDefaultValue();
-        if (!configuration.getOptional(subjectOption).isPresent()) {
-            configuration.setString(subjectOption, subject);
-        }
-    }
-
     // 
--------------------------------------------------------------------------------------------
-    // Inner classes
+    // Enums

Review comment:
       Here you did my suggestion from above :D 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcOptions.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.Duration;
+
+/** Options for the JDBC connector. */
+@PublicEvolving
+public class JdbcOptions {

Review comment:
       Shall we call those classes `JbcConnectorOptions`, 
`KafkaConnectorOptions`, ...? To avoid clashes with catalog options in the 
future.

##########
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptionsUtil.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+
+/** Utilities for {@link DataGenOptions}. */
+@Internal
+public class DataGenOptionsUtil {

Review comment:
       Actually, the task of the `Factory` is to translate options. We could 
also get rid of most of util classes and move the code to the factory itself.

##########
File path: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java
##########
@@ -18,12 +18,14 @@
 
 package org.apache.flink.formats.avro.registry.confluent;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
 import java.util.Map;
 
 /** Options for Schema Registry Avro format. */
+@PublicEvolving
 public class RegistryAvroOptions {

Review comment:
       shall we always use the factory identifier? in this case 
`AvroConfluentOptions`?

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
##########
@@ -29,9 +30,8 @@
 
 import static org.apache.flink.configuration.description.TextElement.text;
 
-/**
- * Options for {@link 
org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
- */
+/** Options for the Elasticsearch connector. */
+@PublicEvolving
 public class ElasticsearchOptions {
     /**

Review comment:
       nit: how about we move helper classes all the way down and split each 
option definition by an empty line for better readability.

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptionUtil.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
+import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.JsonOptions.MAP_NULL_KEY_MODE;
+import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
+
+/** Utilities for {@link JsonOptions}. */
+@Internal
+public class JsonOptionUtil {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Option enumerations
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final String SQL = "SQL";
+    public static final String ISO_8601 = "ISO-8601";
+
+    public static final Set<String> TIMESTAMP_FORMAT_ENUM =
+            new HashSet<>(Arrays.asList(SQL, ISO_8601));
+
+    // The handling mode of null key for map data
+    public static final String JSON_MAP_NULL_KEY_MODE_FAIL = "FAIL";
+    public static final String JSON_MAP_NULL_KEY_MODE_DROP = "DROP";
+    public static final String JSON_MAP_NULL_KEY_MODE_LITERAL = "LITERAL";
+
+    // 
--------------------------------------------------------------------------------------------
+    // Utilities
+    // 
--------------------------------------------------------------------------------------------
+
+    public static TimestampFormat getTimestampFormat(ReadableConfig config) {

Review comment:
       side comment: why is this not an enum? :( 

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
##########
@@ -81,63 +73,6 @@
                     .withDescription(
                             "Optional flag to specify whether to encode all 
decimals as plain numbers instead of possible scientific notations, false by 
default.");
 
-    // 
--------------------------------------------------------------------------------------------
-    // Option enumerations
-    // 
--------------------------------------------------------------------------------------------
-
-    public static final String SQL = "SQL";
-    public static final String ISO_8601 = "ISO-8601";
-
-    public static final Set<String> TIMESTAMP_FORMAT_ENUM =
-            new HashSet<>(Arrays.asList(SQL, ISO_8601));
-
-    // The handling mode of null key for map data
-    public static final String JSON_MAP_NULL_KEY_MODE_FAIL = "FAIL";
-    public static final String JSON_MAP_NULL_KEY_MODE_DROP = "DROP";
-    public static final String JSON_MAP_NULL_KEY_MODE_LITERAL = "LITERAL";
-
-    // 
--------------------------------------------------------------------------------------------
-    // Utilities
-    // 
--------------------------------------------------------------------------------------------
-
-    public static TimestampFormat getTimestampFormat(ReadableConfig config) {
-        String timestampFormat = config.get(TIMESTAMP_FORMAT);
-        switch (timestampFormat) {
-            case SQL:
-                return TimestampFormat.SQL;
-            case ISO_8601:
-                return TimestampFormat.ISO_8601;
-            default:
-                throw new TableException(
-                        String.format(
-                                "Unsupported timestamp format '%s'. Validator 
should have checked that.",
-                                timestampFormat));
-        }
-    }
-
-    /**
-     * Creates handling mode for null key map data.
-     *
-     * <p>See {@link #JSON_MAP_NULL_KEY_MODE_FAIL}, {@link 
#JSON_MAP_NULL_KEY_MODE_DROP}, and {@link
-     * #JSON_MAP_NULL_KEY_MODE_LITERAL} for more information.
-     */
-    public static MapNullKeyMode getMapNullKeyMode(ReadableConfig config) {
-        String mapNullKeyMode = config.get(MAP_NULL_KEY_MODE);
-        switch (mapNullKeyMode.toUpperCase()) {
-            case JSON_MAP_NULL_KEY_MODE_FAIL:
-                return MapNullKeyMode.FAIL;
-            case JSON_MAP_NULL_KEY_MODE_DROP:
-                return MapNullKeyMode.DROP;
-            case JSON_MAP_NULL_KEY_MODE_LITERAL:
-                return MapNullKeyMode.LITERAL;
-            default:
-                throw new TableException(
-                        String.format(
-                                "Unsupported map null key handling mode '%s'. 
Validator should have checked that.",
-                                mapNullKeyMode));
-        }
-    }
-
     // 
--------------------------------------------------------------------------------------------
     // Inner classes

Review comment:
       nit: `Enums`

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptionUtil.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
+import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.JsonOptions.MAP_NULL_KEY_MODE;
+import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
+
+/** Utilities for {@link JsonOptions}. */
+@Internal
+public class JsonOptionUtil {

Review comment:
       In Hbase we called it `HBaseOptionsUtil` with `s`. We should strive for 
consistency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to