This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new fe47462 [FLINK-22471][table][connectors] Improvements to ConfigOption
descriptions
fe47462 is described below
commit fe474628448ff988601b7dbb120c90dca9da3f45
Author: Ingo Bürk <[email protected]>
AuthorDate: Mon Apr 26 09:11:12 2021 +0200
[FLINK-22471][table][connectors] Improvements to ConfigOption descriptions
---
.../elasticsearch/table/ElasticsearchOptions.java | 11 +-
.../jdbc/table/JdbcDynamicTableFactory.java | 45 ++++---
.../connectors/kafka/table/KafkaOptions.java | 124 ++++++++++++-------
.../connectors/kinesis/table/KinesisOptions.java | 41 ++++---
.../flink/table/filesystem/FileSystemOptions.java | 136 +++++++++++++--------
5 files changed, 216 insertions(+), 141 deletions(-)
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
index c40b94c..a595055 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
@@ -85,11 +85,11 @@ public class ElasticsearchOptions {
"Failure handling strategy in case
a request to Elasticsearch fails")
.list(
text(
- "\"fail\" (throws an
exception if a request fails and thus causes a job failure),"),
+ "\"fail\" (throws an
exception if a request fails and thus causes a job failure)"),
text(
- "\"ignore\" (ignores
failures and drops the request),"),
+ "\"ignore\" (ignores
failures and drops the request)"),
text(
- "\"retry-rejected\"
(re-adds requests that have failed due to queue capacity saturation),"),
+ "\"retry-rejected\"
(re-adds requests that have failed due to queue capacity saturation)"),
text(
"\"class name\" for
failure handling with a ActionRequestFailureHandler subclass"))
.build());
@@ -143,9 +143,8 @@ public class ElasticsearchOptions {
.stringType()
.defaultValue("json")
.withDescription(
- "Elasticsearch connector requires to specify a
format.\n"
- + "The format must produce a valid json
document. \n"
- + "By default uses built-in 'json' format.
Please refer to Table Formats section for more details.");
+ "The format must produce a valid JSON document. "
+ + "Please refer to the documentation on
formats for more details.");
private ElasticsearchOptions() {}
}
diff --git
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index 02ae1d8..5505401 100644
---
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -58,28 +58,28 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
ConfigOptions.key("url")
.stringType()
.noDefaultValue()
- .withDescription("the jdbc database url.");
+ .withDescription("The JDBC database URL.");
public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
- .withDescription("the jdbc table name.");
+ .withDescription("The JDBC table name.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
- .withDescription("the jdbc user name.");
+ .withDescription("The JDBC user name.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
- .withDescription("the jdbc password.");
+ .withDescription("The JDBC password.");
private static final ConfigOption<String> DRIVER =
ConfigOptions.key("driver")
.stringType()
.noDefaultValue()
.withDescription(
- "the class name of the JDBC driver to use to
connect to this URL. "
+ "The class name of the JDBC driver to use to
connect to this URL. "
+ "If not set, it will automatically be
derived from the URL.");
public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT =
ConfigOptions.key("connection.max-retry-timeout")
@@ -92,37 +92,35 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
ConfigOptions.key("scan.partition.column")
.stringType()
.noDefaultValue()
- .withDescription("the column name used for partitioning
the input.");
+ .withDescription("The column name used for partitioning
the input.");
private static final ConfigOption<Integer> SCAN_PARTITION_NUM =
ConfigOptions.key("scan.partition.num")
.intType()
.noDefaultValue()
- .withDescription("the number of partitions.");
+ .withDescription("The number of partitions.");
private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND =
ConfigOptions.key("scan.partition.lower-bound")
.longType()
.noDefaultValue()
- .withDescription("the smallest value of the first
partition.");
+ .withDescription("The smallest value of the first
partition.");
private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND =
ConfigOptions.key("scan.partition.upper-bound")
.longType()
.noDefaultValue()
- .withDescription("the largest value of the last
partition.");
+ .withDescription("The largest value of the last
partition.");
private static final ConfigOption<Integer> SCAN_FETCH_SIZE =
ConfigOptions.key("scan.fetch-size")
.intType()
.defaultValue(0)
.withDescription(
- "gives the reader a hint as to the number of rows
that should be fetched, from"
- + " the database when reading per round
trip. If the value specified is zero, then the hint is ignored. The"
- + " default value is zero.");
+ "Gives the reader a hint as to the number of rows
that should be fetched "
+ + "from the database per round-trip when
reading. "
+ + "If the value is zero, this hint is
ignored.");
private static final ConfigOption<Boolean> SCAN_AUTO_COMMIT =
ConfigOptions.key("scan.auto-commit")
.booleanType()
.defaultValue(true)
- .withDescription(
- "sets whether the driver is in auto-commit mode.
The default value is true, per"
- + " the JDBC spec.");
+ .withDescription("Sets whether the driver is in
auto-commit mode.");
// look up config options
private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
@@ -130,19 +128,19 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
.longType()
.defaultValue(-1L)
.withDescription(
- "the max number of rows of lookup cache, over this
value, the oldest rows will "
+ "The max number of rows of lookup cache, over this
value, the oldest rows will "
+ "be eliminated. \"cache.max-rows\" and
\"cache.ttl\" options must all be specified if any of them is "
- + "specified. Cache is not enabled as
default.");
+ + "specified.");
private static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
ConfigOptions.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofSeconds(10))
- .withDescription("the cache time to live.");
+ .withDescription("The cache time to live.");
private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
ConfigOptions.key("lookup.max-retries")
.intType()
.defaultValue(3)
- .withDescription("the max retry times if lookup database
failed.");
+ .withDescription("The max retry times if lookup database
failed.");
// write config options
private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
@@ -150,20 +148,19 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
.intType()
.defaultValue(100)
.withDescription(
- "the flush max size (includes all append, upsert
and delete records), over this number"
- + " of records, will flush data. The
default value is 100.");
+ "The flush max size (includes all append, upsert
and delete records), over this number"
+ + " of records, will flush data.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
ConfigOptions.key("sink.buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription(
- "the flush interval mills, over this time,
asynchronous threads will flush data. The "
- + "default value is 1s.");
+ "The flush interval mills, over this time,
asynchronous threads will flush data.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES =
ConfigOptions.key("sink.max-retries")
.intType()
.defaultValue(3)
- .withDescription("the max retry times if writing records
to database failed.");
+ .withDescription("The max retry times if writing records
to database failed.");
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
index 993af27..de8f451 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.description.Description;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
@@ -51,6 +52,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
+import static org.apache.flink.configuration.description.TextElement.text;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.AT_LEAST_ONCE;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.EXACTLY_ONCE;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.NONE;
@@ -97,32 +99,39 @@ public class KafkaOptions {
.enumType(ValueFieldsStrategy.class)
.defaultValue(ValueFieldsStrategy.ALL)
.withDescription(
- "Defines a strategy how to deal with key columns
in the data type of "
- + "the value format. By default, '"
- + ValueFieldsStrategy.ALL
- + "' physical "
- + "columns of the table schema will be
included in the value format which "
- + "means that key columns appear in the
data type for both the key and value "
- + "format.");
+ String.format(
+ "Defines a strategy how to deal with key
columns in the data type "
+ + "of the value format. By
default, '%s' physical columns of the table schema "
+ + "will be included in the value
format which means that the key columns "
+ + "appear in the data type for
both the key and value format.",
+ ValueFieldsStrategy.ALL));
public static final ConfigOption<String> KEY_FIELDS_PREFIX =
ConfigOptions.key("key.fields-prefix")
.stringType()
.noDefaultValue()
.withDescription(
- "Defines a custom prefix for all fields of the key
format to avoid "
- + "name clashes with fields of the value
format. By default, the prefix is empty. "
- + "If a custom prefix is defined, both the
table schema and "
- + "'"
- + KEY_FIELDS.key()
- + "' will work with prefixed names. When
constructing "
- + "the data type of the key format, the
prefix will be removed and the "
- + "non-prefixed names will be used within
the key format. Please note that this "
- + "option requires that '"
- + VALUE_FIELDS_INCLUDE.key()
- + "' must be '"
- + ValueFieldsStrategy.EXCEPT_KEY
- + "'.");
+ Description.builder()
+ .text(
+ "Defines a custom prefix for all
fields of the key format to avoid "
+ + "name clashes with
fields of the value format. "
+ + "By default, the prefix
is empty.")
+ .linebreak()
+ .text(
+ String.format(
+ "If a custom prefix is
defined, both the table schema and '%s' will work with prefixed names.",
+ KEY_FIELDS.key()))
+ .linebreak()
+ .text(
+ "When constructing the data type
of the key format, the prefix "
+ + "will be removed and the
non-prefixed names will be used within the key format.")
+ .linebreak()
+ .text(
+ String.format(
+ "Please note that this
option requires that '%s' must be '%s'.",
+ VALUE_FIELDS_INCLUDE.key(),
+
ValueFieldsStrategy.EXCEPT_KEY))
+ .build());
//
--------------------------------------------------------------------------------------------
// Kafka specific options
@@ -166,9 +175,16 @@ public class KafkaOptions {
.stringType()
.defaultValue("group-offsets")
.withDescription(
- "Optional startup mode for Kafka consumer, valid
enumerations are "
- + "\"earliest-offset\", \"latest-offset\",
\"group-offsets\", \"timestamp\"\n"
- + "or \"specific-offsets\"");
+ Description.builder()
+ .text(
+ "Optional startup mode for Kafka
consumer, valid enumerations are")
+ .list(
+ text("'earliest-offset'"),
+ text("'latest-offset'"),
+ text("'group-offsets'"),
+ text("'timestamp'"),
+ text("'specific-offsets'"))
+ .build());
public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS =
ConfigOptions.key("scan.startup.specific-offsets")
@@ -200,19 +216,30 @@ public class KafkaOptions {
.stringType()
.defaultValue("default")
.withDescription(
- "Optional output partitioning from Flink's
partitions\n"
- + "into Kafka's partitions valid
enumerations are\n"
- + "\"default\": (use kafka default
partitioner to partition records),\n"
- + "\"fixed\": (each Flink partition ends
up in at most one Kafka partition),\n"
- + "\"round-robin\": (a Flink partition is
distributed to Kafka partitions round-robin when 'key.fields' is not
specified.)\n"
- + "\"custom class name\": (use a custom
FlinkKafkaPartitioner subclass)");
+ Description.builder()
+ .text(
+ "Optional output partitioning from
Flink's partitions into Kafka's partitions. Valid enumerations are")
+ .list(
+ text(
+ "'default' (use kafka
default partitioner to partition records)"),
+ text(
+ "'fixed' (each Flink
partition ends up in at most one Kafka partition)"),
+ text(
+ "'round-robin' (a Flink
partition is distributed to Kafka partitions round-robin when 'key.fields' is
not specified)"),
+ text(
+ "custom class name (use
custom FlinkKafkaPartitioner subclass)"))
+ .build());
public static final ConfigOption<String> SINK_SEMANTIC =
ConfigOptions.key("sink.semantic")
.stringType()
.defaultValue("at-least-once")
.withDescription(
- "Optional semantic when commit. Valid
enumerationns are [\"at-least-once\", \"exactly-once\", \"none\"]");
+ Description.builder()
+ .text(
+ "Optional semantic when
committing. Valid enumerations are")
+ .list(text("at-least-once"),
text("exactly-once"), text("none"))
+ .build());
// Disable this feature by default
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
@@ -220,12 +247,19 @@ public class KafkaOptions {
.intType()
.defaultValue(0)
.withDescription(
- "The max size of buffered records before flush. "
- + "When the sink receives many updates on
the same key, the buffer will retain the last record of the same key. "
- + "This can help to reduce data shuffling
and avoid possible tombstone messages to Kafka topic."
- + "Can be set to '0' to disable it."
- + "Note both 'sink.buffer-flush.max-rows'
and 'sink.buffer-flush.interval' "
- + "must be set to be greater than zero to
enable sink buffer flushing.");
+ Description.builder()
+ .text(
+ "The max size of buffered records
before flushing. "
+ + "When the sink receives
many updates on the same key, "
+ + "the buffer will retain
the last records of the same key. "
+ + "This can help to reduce
data shuffling and avoid possible tombstone messages to the Kafka topic.")
+ .linebreak()
+ .text("Can be set to '0' to disable it.")
+ .linebreak()
+ .text(
+ "Note both
'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' "
+ + "must be set to be
greater than zero to enable sink buffer flushing.")
+ .build());
// Disable this feature by default
public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
@@ -233,12 +267,18 @@ public class KafkaOptions {
.durationType()
.defaultValue(Duration.ofSeconds(0))
.withDescription(
- "The flush interval mills, over this time,
asynchronous threads will flush data. "
- + "When the sink receives many updates on
the same key, the buffer will retain the last record of the same key. "
- + "This can help to reduce data shuffling
and avoid possible tombstone messages to Kafka topic."
- + "Can be set to '0' to disable it. "
- + "Note both 'sink.buffer-flush.max-rows'
and 'sink.buffer-flush.interval' "
- + "must be set to be greater than zero to
enable sink buffer flushing.");
+ Description.builder()
+ .text(
+ "The flush interval millis. Over
this time, asynchronous threads "
+ + "will flush data. When
the sink receives many updates on the same key, "
+ + "the buffer will retain
the last record of the same key.")
+ .linebreak()
+ .text("Can be set to '0' to disable it.")
+ .linebreak()
+ .text(
+ "Note both
'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' "
+ + "must be set to be
greater than zero to enable sink buffer flushing.")
+ .build());
private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT =
ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue();
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java
index 5e50a21..b30d401 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.description.Description;
import org.apache.flink.streaming.connectors.kinesis.FixedKinesisPartitioner;
import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
import org.apache.flink.streaming.connectors.kinesis.RandomKinesisPartitioner;
@@ -40,6 +41,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
/**
* Options for Kinesis tables supported by the {@code CREATE TABLE ... WITH
...} clause of the Flink
* SQL dialect and the Flink Table API.
@@ -86,7 +90,7 @@ public class KinesisOptions {
ConfigOptions.key("stream")
.stringType()
.noDefaultValue()
- .withDescription("Name of the Kinesis stream backing this
table (required)");
+ .withDescription("Name of the Kinesis stream backing this
table.");
//
-----------------------------------------------------------------------------------------
// Sink specific options
@@ -97,26 +101,33 @@ public class KinesisOptions {
.stringType()
.noDefaultValue()
.withDescription(
- "Optional output partitioning from Flink's
partitions into Kinesis shards. "
- + "Sinks that write to tables defined with
the PARTITION BY clause "
- + "always use a field-based partitioner
and cannot define this option. "
- + "Valid enumerations are: \n"
- + "\"random\":"
- + " (use a random partition key),\n"
- + "\"fixed\":"
- + " (each Flink partition ends up in at
most one Kinesis shard),\n"
- + "\"custom class name\":"
- + " (use a custom "
- + KinesisPartitioner.class.getName()
- + " subclass)");
+ Description.builder()
+ .text(
+ "Optional output partitioning from
Flink's partitions into Kinesis shards. "
+ + "Sinks that write to
tables defined with the %s clause always use a "
+ + "field-based partitioner
and cannot define this option.",
+ code("PARTITION BY"))
+ .linebreak()
+ .text("Valid enumerations are")
+ .list(
+ text("random (use a random
partition key)"),
+ text(
+ "fixed (each Flink
partition ends up in at most one Kinesis shard)"),
+ text(
+ "custom class name (use a
custom %s subclass)",
+
text(KinesisPartitioner.class.getName())))
+ .build());
public static final ConfigOption<String> SINK_PARTITIONER_FIELD_DELIMITER =
ConfigOptions.key("sink.partitioner-field-delimiter")
.stringType()
.defaultValue("|")
.withDescription(
- "Optional field delimiter for fields-based
partitioner "
- + "derived from a PARTITION BY clause
(\"|\" by default)");
+ Description.builder()
+ .text(
+ "Optional field delimiter for
fields-based partitioner derived from a %s clause",
+ code("PARTITION BY"))
+ .build());
//
-----------------------------------------------------------------------------------------
// Option enumerations
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
index 0f985d6..bfc17e4 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
@@ -20,11 +20,13 @@ package org.apache.flink.table.filesystem;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
import org.apache.flink.table.factories.FactoryUtil;
import java.time.Duration;
import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
/** This class holds configuration constants used by filesystem(Including
hive) connector. */
public class FileSystemOptions {
@@ -38,14 +40,13 @@ public class FileSystemOptions {
.defaultValue("__DEFAULT_PARTITION__")
.withDescription(
"The default partition name in case the dynamic
partition"
- + " column value is null/empty string");
+ + " column value is null/empty string.");
public static final ConfigOption<MemorySize> SINK_ROLLING_POLICY_FILE_SIZE
=
key("sink.rolling-policy.file-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(128))
- .withDescription(
- "The maximum part file size before rolling (by
default 128MB).");
+ .withDescription("The maximum part file size before
rolling.");
public static final ConfigOption<Duration>
SINK_ROLLING_POLICY_ROLLOVER_INTERVAL =
key("sink.rolling-policy.rollover-interval")
@@ -53,7 +54,7 @@ public class FileSystemOptions {
.defaultValue(Duration.ofMinutes(30))
.withDescription(
"The maximum time duration a part file can stay
open before rolling"
- + " (by default 30 min to avoid to many
small files). The frequency at which"
+ + " (by default long enough to avoid too
many small files). The frequency at which"
+ " this is checked is controlled by the
'sink.rolling-policy.check-interval' option.");
public static final ConfigOption<Duration>
SINK_ROLLING_POLICY_CHECK_INTERVAL =
@@ -71,28 +72,34 @@ public class FileSystemOptions {
.withDescription(
"The option to enable shuffle data by dynamic
partition fields in sink"
+ " phase, this can greatly reduce the
number of file for filesystem sink but may"
- + " lead data skew, the default value is
disabled.");
+ + " lead data skew.");
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
key("streaming-source.enable")
.booleanType()
.defaultValue(false)
.withDescription(
- "Enable streaming source or not.\n"
- + " NOTES: Please make sure that each
partition/file should be written"
- + " atomically, otherwise the reader may
get incomplete data.");
+ Description.builder()
+ .text("Enable streaming source or not.")
+ .linebreak()
+ .text(
+ " NOTES: Please make sure that
each partition/file should be written"
+ + " atomically, otherwise
the reader may get incomplete data.")
+ .build());
public static final ConfigOption<String>
STREAMING_SOURCE_PARTITION_INCLUDE =
key("streaming-source.partition.include")
.stringType()
.defaultValue("all")
.withDescription(
- "Option to set the partitions to read, the
supported values "
- + "are \"all\" and \"latest\","
- + " the \"all\" means read all partitions;
the \"latest\" means read latest "
- + "partition in order of
streaming-source.partition.order, the \"latest\" only works"
- + " when the streaming hive source table
used as temporal table. "
- + "By default the option is \"all\".\n.");
+ Description.builder()
+ .text(
+ "Option to set the partitions to
read, supported values are")
+ .list(
+ text("all (read all partitions)"),
+ text(
+ "latest (read latest
partition in order of 'streaming-source.partition.order', this only works when
a streaming Hive source table is used as a temporal table)"))
+ .build());
public static final ConfigOption<Duration>
STREAMING_SOURCE_MONITOR_INTERVAL =
key("streaming-source.monitor-interval")
@@ -106,39 +113,47 @@ public class FileSystemOptions {
.defaultValue("partition-name")
.withDeprecatedKeys("streaming-source.consume-order")
.withDescription(
- "The partition order of streaming source,"
- + " support \"create-time\",
\"partition-time\" and \"partition-name\"."
- + " \"create-time\" compares
partition/file creation time, this is not the"
- + " partition create time in Hive
metaStore, but the folder/file modification"
- + " time in filesystem, if the partition
folder somehow gets updated,"
- + " e.g. add new file into folder, it can
affect how the data is consumed."
- + " \"partition-time\" compares the time
extracted from partition name."
- + " \"partition-name\" compares partition
name's alphabetical order."
- + " This option is equality with
deprecated option \"streaming-source.consume-order\".");
+ Description.builder()
+ .text(
+ "The partition order of the
streaming source, supported values are")
+ .list(
+ text(
+ "create-time (compares
partition/file creation time, which is not the partition creation time in the
Hive metastore, "
+ + "but the
folder/file modification time in the filesystem; e.g., adding a new file into "
+ + "the folder may
affect how the data is consumed)"),
+ text(
+ "partition-time (compares
the time extracted from the partition name)"),
+ text(
+ "partition-name (compares
partition names lexicographically)"))
+ .text(
+ "This is a synonym for the
deprecated 'streaming-source.consume-order' option.")
+ .build());
public static final ConfigOption<String>
STREAMING_SOURCE_CONSUME_START_OFFSET =
key("streaming-source.consume-start-offset")
.stringType()
.noDefaultValue()
.withDescription(
- "Start offset for streaming consuming."
- + " How to parse and compare offsets
depends on your order."
- + " For create-time and partition-time,
should be a timestamp"
- + " string (yyyy-[m]m-[d]d [hh:mm:ss])."
- + " For partition-time, will use partition
time extractor to"
- + " extract time from partition."
- + " For partition-name, is the partition
name string, e.g.:"
- + " pt_year=2020/pt_mon=10/pt_day=01");
+ Description.builder()
+ .text(
+ "Start offset for streaming
consuming. How to parse and compare offsets depends on
'streaming-source.partition-order'.")
+ .list(
+ text(
+ "For 'create-time' and
'partition-time' it should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss])."),
+ text(
+ "For 'partition-time' it
will use a partition time extractor to extract the time from the partition."),
+ text(
+ "For 'partition-name' it
is the name of the partition, e.g. 'pt_year=2020/pt_mon=10/pt_day=01'."))
+ .build());
public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_KIND =
key("partition.time-extractor.kind")
.stringType()
.defaultValue("default")
.withDescription(
- "Time extractor to extract time from partition
values."
- + " Support default and custom."
- + " For default, can configure timestamp
pattern."
- + " For custom, should configure extractor
class.");
+ "Time extractor to extract time from partition
values. "
+ + "This can either be 'default' or a
custom extractor class. "
+ + "For 'default', you can configure a
timestamp pattern.");
public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_CLASS =
key("partition.time-extractor.class")
@@ -152,43 +167,56 @@ public class FileSystemOptions {
.stringType()
.noDefaultValue()
.withDescription(
- "The 'default' construction way allows users to
use partition"
- + " fields to get a legal timestamp
pattern."
- + " Default support 'yyyy-mm-dd hh:mm:ss'
from first field."
- + " If timestamp in partition is single
field 'dt', can configure: '$dt'."
- + " If timestamp in partition is year,
month, day, hour,"
- + " can configure: '$year-$month-$day
$hour:00:00'."
- + " If timestamp in partition is dt and
hour, can configure: '$dt $hour:00:00'.");
+ Description.builder()
+ .text(
+ "When
'partition.time-extractor.kind' is set to 'default', "
+ + "you can specify a
pattern to get a timestamp from partitions.")
+ .list(
+ text(
+ "By default, a format of
'yyyy-mm-dd hh:mm:ss' is read from the first field."),
+ text(
+ "If the timestamp in the
partition is a single field called 'dt', you can use '$dt'."),
+ text(
+ "If it is spread across
multiple fields for year, month, day, and hour, you can use '$year-$month-$day
$hour:00:00'."),
+ text(
+ "If the timestamp is in
fields dt and hour, you can use '$dt $hour:00:00'."))
+ .build());
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL =
key("lookup.join.cache.ttl")
.durationType()
.defaultValue(Duration.ofMinutes(60))
.withDescription(
- "The cache TTL (e.g. 10min) for the build table in
lookup join. "
- + "By default the TTL is 60 minutes.");
+ "The cache TTL (e.g. 10min) for the build table in
lookup join.");
public static final ConfigOption<String> SINK_PARTITION_COMMIT_TRIGGER =
key("sink.partition-commit.trigger")
.stringType()
.defaultValue("process-time")
.withDescription(
- "Trigger type for partition commit:\n"
- + " 'process-time': based on the time of
the machine, it neither requires"
- + " partition time extraction nor
watermark generation. Commit partition"
- + " once the 'current system time' passes
'partition creation system time' plus 'delay'.\n"
- + " 'partition-time': based on the time
that extracted from partition values,"
- + " it requires watermark generation.
Commit partition once the 'watermark'"
- + " passes 'time extracted from partition
values' plus 'delay'.");
+ Description.builder()
+ .text("Trigger type for partition commit,
supported values are")
+ .list(
+ text(
+ "process-time (based on
the time of the machine, requires "
+ + "neither
partition time extraction nor watermark generation; "
+ + "commits
partition once the current system time passes partition creation system time
plus delay)"),
+ text(
+ "partition-time (based on
the time extracted from partition values, "
+ + "requires
watermark generation; commits partition once "
+ + "the watermark
passes the time extracted from partition values plus delay)"))
+ .build());
public static final ConfigOption<Duration> SINK_PARTITION_COMMIT_DELAY =
key("sink.partition-commit.delay")
.durationType()
.defaultValue(Duration.ofMillis(0))
.withDescription(
- "The partition will not commit until the delay
time."
- + " if it is a day partition, should be '1
d',"
- + " if it is a hour partition, should be
'1 h'");
+ Description.builder()
+ .text(
+ "The partition will not commit
until the delay time. "
+ + "The value should be '1
d' for day partitions and '1 h' for hour partitions.")
+ .build());
public static final ConfigOption<String>
SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE =
key("sink.partition-commit.watermark-time-zone")