twalthr commented on a change in pull request #16334:
URL: https://github.com/apache/flink/pull/16334#discussion_r666163003



##########
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseConnectorOptions.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.options;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.Duration;
+
+/** Options for the HBase connector. */
+@PublicEvolving
+public class HBaseConnectorOptions {
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The name of HBase table to connect.");
+
+    public static final ConfigOption<String> ZOOKEEPER_QUORUM =
+            ConfigOptions.key("zookeeper.quorum")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The HBase Zookeeper quorum.");
+
+    public static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT =
+            ConfigOptions.key("zookeeper.znode.parent")
+                    .stringType()
+                    .defaultValue("/hbase")
+                    .withDescription("The root dir in Zookeeper for HBase 
cluster.");
+
+    public static final ConfigOption<String> NULL_STRING_LITERAL =
+            ConfigOptions.key("null-string-literal")
+                    .stringType()
+                    .defaultValue("null")
+                    .withDescription(
+                            "Representation for null values for string fields. 
HBase source and "
+                                    + "sink encodes/decodes empty bytes as 
null values for all types except string type.");
+
+    public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE =
+            ConfigOptions.key("sink.buffer-flush.max-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("2mb"))
+                    .withDescription(
+                            "Writing option, maximum size in memory of 
buffered rows for each "
+                                    + "writing request. This can improve 
performance for writing data to HBase database, "
+                                    + "but may increase the latency. Can be 
set to '0' to disable it. ");
+
+    public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
+            ConfigOptions.key("sink.buffer-flush.max-rows")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "Writing option, maximum number of rows to buffer 
for each writing request. "
+                                    + "This can improve performance for 
writing data to HBase database, but may increase the latency. "
+                                    + "Can be set to '0' to disable it.");
+
+    public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
+            ConfigOptions.key("sink.buffer-flush.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription(
+                            "Writing option, the interval to flush any 
buffered rows. "
+                                    + "This can improve performance for 
writing data to HBase database, but may increase the latency. "
+                                    + "Can be set to '0' to disable it. Note, 
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
+                                    + "can be set to '0' with the flush 
interval set allowing for complete async processing of buffered actions.");
+
+    public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+            ConfigOptions.key("lookup.async")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("whether to set async lookup.");
+
+    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+            ConfigOptions.key("lookup.cache.max-rows")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription(
+                            "the max number of rows of lookup cache, over this 
value, the oldest rows will "
+                                    + "be eliminated. \"cache.max-rows\" and 
\"cache.ttl\" options must all be specified if any of them is "
+                                    + "specified. Cache is not enabled as 
default.");
+
+    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+            ConfigOptions.key("lookup.cache.ttl")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(0))
+                    .withDescription("the cache time to live.");
+
+    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+            ConfigOptions.key("lookup.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("the max retry times if lookup database 
failed.");
+
+    public static final ConfigOption<Integer> SINK_PARALLELISM = 
FactoryUtil.SINK_PARALLELISM;

Review comment:
       private constructor is missing

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java
##########
@@ -18,14 +18,14 @@
 
 package org.apache.flink.connector.jdbc.catalog.factory;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 
 /** {@link ConfigOption}s for {@link JdbcCatalog}. */
-@Internal
+@PublicEvolving

Review comment:
       rename to `JdbcCatalogOptions`? or leave it `@Internal` for now

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for the JSON format. */
+@PublicEvolving
+public class JsonFormatOptions {
+
+    public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD =
+            ConfigOptions.key("fail-on-missing-field")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional flag to specify whether to fail if a 
field is missing or not, false by default.");
+
+    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+            ConfigOptions.key("ignore-parse-errors")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional flag to skip fields and rows with parse 
errors instead of failing;\n"
+                                    + "fields are set to null in case of 
errors, false by default.");
+
+    public static final ConfigOption<String> MAP_NULL_KEY_MODE =
+            ConfigOptions.key("map-null-key.mode")
+                    .stringType()
+                    .defaultValue("FAIL")
+                    .withDescription(
+                            "Optional flag to control the handling mode when 
serializing null key for map data, FAIL by default."
+                                    + " Option DROP will drop null key entries 
for map data."
+                                    + " Option LITERAL will use 
'map-null-key.literal' as key literal.");
+
+    public static final ConfigOption<String> MAP_NULL_KEY_LITERAL =
+            ConfigOptions.key("map-null-key.literal")
+                    .stringType()
+                    .defaultValue("null")
+                    .withDescription(
+                            "Optional flag to specify string literal for null 
keys when 'map-null-key.mode' is LITERAL, \"null\" by default.");
+
+    public static final ConfigOption<String> TIMESTAMP_FORMAT =
+            ConfigOptions.key("timestamp-format.standard")
+                    .stringType()
+                    .defaultValue("SQL")
+                    .withDescription(
+                            "Optional flag to specify timestamp format, SQL by 
default."
+                                    + " Option ISO-8601 will parse input 
timestamp in \"yyyy-MM-ddTHH:mm:ss.s{precision}\" format and output timestamp 
in the same format."
+                                    + " Option SQL will parse input timestamp 
in \"yyyy-MM-dd HH:mm:ss.s{precision}\" format and output timestamp in the same 
format.");
+
+    public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER =
+            ConfigOptions.key("encode.decimal-as-plain-number")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional flag to specify whether to encode all 
decimals as plain numbers instead of possible scientific notations, false by 
default.");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Enums
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Handling mode for map data with null key. */
+    public enum MapNullKeyMode {
+        FAIL,
+        DROP,
+        LITERAL
+    }

Review comment:
       private constructor missing

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -741,8 +550,9 @@ private static boolean hasKafkaClientProperties(Map<String, 
String> tableOptions
      * Creates an array of indices that determine which physical fields of the 
table schema to
      * include in the value format.
      *
-     * <p>See {@link #VALUE_FORMAT}, {@link #VALUE_FIELDS_INCLUDE}, and {@link 
#KEY_FIELDS_PREFIX}
-     * for more information.
+     * <p>See {@link KafkaConnectorOptions#VALUE_FORMAT}, {@link

Review comment:
       We could have avoided these changes with static imports.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to