twalthr commented on a change in pull request #16334: URL: https://github.com/apache/flink/pull/16334#discussion_r666163003
########## File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseConnectorOptions.java ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.hbase.options; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.table.factories.FactoryUtil; + +import java.time.Duration; + +/** Options for the HBase connector. */ +@PublicEvolving +public class HBaseConnectorOptions { + + public static final ConfigOption<String> TABLE_NAME = + ConfigOptions.key("table-name") + .stringType() + .noDefaultValue() + .withDescription("The name of HBase table to connect."); + + public static final ConfigOption<String> ZOOKEEPER_QUORUM = + ConfigOptions.key("zookeeper.quorum") + .stringType() + .noDefaultValue() + .withDescription("The HBase Zookeeper quorum."); + + public static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT = + ConfigOptions.key("zookeeper.znode.parent") + .stringType() + .defaultValue("/hbase") + .withDescription("The root dir in Zookeeper for HBase cluster."); + + public static final ConfigOption<String> NULL_STRING_LITERAL = + ConfigOptions.key("null-string-literal") + .stringType() + .defaultValue("null") + .withDescription( + "Representation for null values for string fields. HBase source and " + + "sink encodes/decodes empty bytes as null values for all types except string type."); + + public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE = + ConfigOptions.key("sink.buffer-flush.max-size") + .memoryType() + .defaultValue(MemorySize.parse("2mb")) + .withDescription( + "Writing option, maximum size in memory of buffered rows for each " + + "writing request. This can improve performance for writing data to HBase database, " + + "but may increase the latency. Can be set to '0' to disable it. "); + + public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = + ConfigOptions.key("sink.buffer-flush.max-rows") + .intType() + .defaultValue(1000) + .withDescription( + "Writing option, maximum number of rows to buffer for each writing request. " + + "This can improve performance for writing data to HBase database, but may increase the latency. " + + "Can be set to '0' to disable it."); + + public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = + ConfigOptions.key("sink.buffer-flush.interval") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription( + "Writing option, the interval to flush any buffered rows. " + + "This can improve performance for writing data to HBase database, but may increase the latency. " + + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' " + + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions."); + + public static final ConfigOption<Boolean> LOOKUP_ASYNC = + ConfigOptions.key("lookup.async") + .booleanType() + .defaultValue(false) + .withDescription("whether to set async lookup."); + + public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = + ConfigOptions.key("lookup.cache.max-rows") + .longType() + .defaultValue(-1L) + .withDescription( + "the max number of rows of lookup cache, over this value, the oldest rows will " + + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " + + "specified. Cache is not enabled as default."); + + public static final ConfigOption<Duration> LOOKUP_CACHE_TTL = + ConfigOptions.key("lookup.cache.ttl") + .durationType() + .defaultValue(Duration.ofSeconds(0)) + .withDescription("the cache time to live."); + + public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = + ConfigOptions.key("lookup.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if lookup database failed."); + + public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; Review comment: private constructor is missing ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java ########## @@ -18,14 +18,14 @@ package org.apache.flink.connector.jdbc.catalog.factory; -import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; import org.apache.flink.table.catalog.CommonCatalogOptions; /** {@link ConfigOption}s for {@link JdbcCatalog}. */ -@Internal +@PublicEvolving Review comment: rename to `JdbcCatalogOptions`? or leave it `@Internal` for now ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** Options for the JSON format. */ +@PublicEvolving +public class JsonFormatOptions { + + public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = + ConfigOptions.key("fail-on-missing-field") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to specify whether to fail if a field is missing or not, false by default."); + + public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = + ConfigOptions.key("ignore-parse-errors") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to skip fields and rows with parse errors instead of failing;\n" + + "fields are set to null in case of errors, false by default."); + + public static final ConfigOption<String> MAP_NULL_KEY_MODE = + ConfigOptions.key("map-null-key.mode") + .stringType() + .defaultValue("FAIL") + .withDescription( + "Optional flag to control the handling mode when serializing null key for map data, FAIL by default." + + " Option DROP will drop null key entries for map data." + + " Option LITERAL will use 'map-null-key.literal' as key literal."); + + public static final ConfigOption<String> MAP_NULL_KEY_LITERAL = + ConfigOptions.key("map-null-key.literal") + .stringType() + .defaultValue("null") + .withDescription( + "Optional flag to specify string literal for null keys when 'map-null-key.mode' is LITERAL, \"null\" by default."); + + public static final ConfigOption<String> TIMESTAMP_FORMAT = + ConfigOptions.key("timestamp-format.standard") + .stringType() + .defaultValue("SQL") + .withDescription( + "Optional flag to specify timestamp format, SQL by default." + + " Option ISO-8601 will parse input timestamp in \"yyyy-MM-ddTHH:mm:ss.s{precision}\" format and output timestamp in the same format." + + " Option SQL will parse input timestamp in \"yyyy-MM-dd HH:mm:ss.s{precision}\" format and output timestamp in the same format."); + + public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER = + ConfigOptions.key("encode.decimal-as-plain-number") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default."); + + // -------------------------------------------------------------------------------------------- + // Enums + // -------------------------------------------------------------------------------------------- + + /** Handling mode for map data with null key. */ + public enum MapNullKeyMode { + FAIL, + DROP, + LITERAL + } Review comment: private constructor missing ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java ########## @@ -741,8 +550,9 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions * Creates an array of indices that determine which physical fields of the table schema to * include in the value format. * - * <p>See {@link #VALUE_FORMAT}, {@link #VALUE_FIELDS_INCLUDE}, and {@link #KEY_FIELDS_PREFIX} - * for more information. + * <p>See {@link KafkaConnectorOptions#VALUE_FORMAT}, {@link Review comment: We could have avoided these changes with static imports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
