This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new fe47462  [FLINK-22471][table][connectors] Improvements to ConfigOption 
descriptions
fe47462 is described below

commit fe474628448ff988601b7dbb120c90dca9da3f45
Author: Ingo Bürk <[email protected]>
AuthorDate: Mon Apr 26 09:11:12 2021 +0200

    [FLINK-22471][table][connectors] Improvements to ConfigOption descriptions
---
 .../elasticsearch/table/ElasticsearchOptions.java  |  11 +-
 .../jdbc/table/JdbcDynamicTableFactory.java        |  45 ++++---
 .../connectors/kafka/table/KafkaOptions.java       | 124 ++++++++++++-------
 .../connectors/kinesis/table/KinesisOptions.java   |  41 ++++---
 .../flink/table/filesystem/FileSystemOptions.java  | 136 +++++++++++++--------
 5 files changed, 216 insertions(+), 141 deletions(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
index c40b94c..a595055 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
@@ -85,11 +85,11 @@ public class ElasticsearchOptions {
                                             "Failure handling strategy in case 
a request to Elasticsearch fails")
                                     .list(
                                             text(
-                                                    "\"fail\" (throws an 
exception if a request fails and thus causes a job failure),"),
+                                                    "\"fail\" (throws an 
exception if a request fails and thus causes a job failure)"),
                                             text(
-                                                    "\"ignore\" (ignores 
failures and drops the request),"),
+                                                    "\"ignore\" (ignores 
failures and drops the request)"),
                                             text(
-                                                    "\"retry-rejected\" 
(re-adds requests that have failed due to queue capacity saturation),"),
+                                                    "\"retry-rejected\" 
(re-adds requests that have failed due to queue capacity saturation)"),
                                             text(
                                                     "\"class name\" for 
failure handling with a ActionRequestFailureHandler subclass"))
                                     .build());
@@ -143,9 +143,8 @@ public class ElasticsearchOptions {
                     .stringType()
                     .defaultValue("json")
                     .withDescription(
-                            "Elasticsearch connector requires to specify a 
format.\n"
-                                    + "The format must produce a valid json 
document. \n"
-                                    + "By default uses built-in 'json' format. 
Please refer to Table Formats section for more details.");
+                            "The format must produce a valid JSON document. "
+                                    + "Please refer to the documentation on 
formats for more details.");
 
     private ElasticsearchOptions() {}
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index 02ae1d8..5505401 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -58,28 +58,28 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
             ConfigOptions.key("url")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("the jdbc database url.");
+                    .withDescription("The JDBC database URL.");
     public static final ConfigOption<String> TABLE_NAME =
             ConfigOptions.key("table-name")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("the jdbc table name.");
+                    .withDescription("The JDBC table name.");
     public static final ConfigOption<String> USERNAME =
             ConfigOptions.key("username")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("the jdbc user name.");
+                    .withDescription("The JDBC user name.");
     public static final ConfigOption<String> PASSWORD =
             ConfigOptions.key("password")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("the jdbc password.");
+                    .withDescription("The JDBC password.");
     private static final ConfigOption<String> DRIVER =
             ConfigOptions.key("driver")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "the class name of the JDBC driver to use to 
connect to this URL. "
+                            "The class name of the JDBC driver to use to 
connect to this URL. "
                                     + "If not set, it will automatically be 
derived from the URL.");
     public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT =
             ConfigOptions.key("connection.max-retry-timeout")
@@ -92,37 +92,35 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
             ConfigOptions.key("scan.partition.column")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("the column name used for partitioning 
the input.");
+                    .withDescription("The column name used for partitioning 
the input.");
     private static final ConfigOption<Integer> SCAN_PARTITION_NUM =
             ConfigOptions.key("scan.partition.num")
                     .intType()
                     .noDefaultValue()
-                    .withDescription("the number of partitions.");
+                    .withDescription("The number of partitions.");
     private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND =
             ConfigOptions.key("scan.partition.lower-bound")
                     .longType()
                     .noDefaultValue()
-                    .withDescription("the smallest value of the first 
partition.");
+                    .withDescription("The smallest value of the first 
partition.");
     private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND =
             ConfigOptions.key("scan.partition.upper-bound")
                     .longType()
                     .noDefaultValue()
-                    .withDescription("the largest value of the last 
partition.");
+                    .withDescription("The largest value of the last 
partition.");
     private static final ConfigOption<Integer> SCAN_FETCH_SIZE =
             ConfigOptions.key("scan.fetch-size")
                     .intType()
                     .defaultValue(0)
                     .withDescription(
-                            "gives the reader a hint as to the number of rows 
that should be fetched, from"
-                                    + " the database when reading per round 
trip. If the value specified is zero, then the hint is ignored. The"
-                                    + " default value is zero.");
+                            "Gives the reader a hint as to the number of rows 
that should be fetched "
+                                    + "from the database per round-trip when 
reading. "
+                                    + "If the value is zero, this hint is 
ignored.");
     private static final ConfigOption<Boolean> SCAN_AUTO_COMMIT =
             ConfigOptions.key("scan.auto-commit")
                     .booleanType()
                     .defaultValue(true)
-                    .withDescription(
-                            "sets whether the driver is in auto-commit mode. 
The default value is true, per"
-                                    + " the JDBC spec.");
+                    .withDescription("Sets whether the driver is in 
auto-commit mode.");
 
     // look up config options
     private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
@@ -130,19 +128,19 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                     .longType()
                     .defaultValue(-1L)
                     .withDescription(
-                            "the max number of rows of lookup cache, over this 
value, the oldest rows will "
+                            "The max number of rows of lookup cache, over this 
value, the oldest rows will "
                                     + "be eliminated. \"cache.max-rows\" and 
\"cache.ttl\" options must all be specified if any of them is "
-                                    + "specified. Cache is not enabled as 
default.");
+                                    + "specified.");
     private static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
             ConfigOptions.key("lookup.cache.ttl")
                     .durationType()
                     .defaultValue(Duration.ofSeconds(10))
-                    .withDescription("the cache time to live.");
+                    .withDescription("The cache time to live.");
     private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
             ConfigOptions.key("lookup.max-retries")
                     .intType()
                     .defaultValue(3)
-                    .withDescription("the max retry times if lookup database 
failed.");
+                    .withDescription("The max retry times if lookup database 
failed.");
 
     // write config options
     private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
@@ -150,20 +148,19 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                     .intType()
                     .defaultValue(100)
                     .withDescription(
-                            "the flush max size (includes all append, upsert 
and delete records), over this number"
-                                    + " of records, will flush data. The 
default value is 100.");
+                            "The flush max size (includes all append, upsert 
and delete records), over this number"
+                                    + " of records, will flush data.");
     private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
             ConfigOptions.key("sink.buffer-flush.interval")
                     .durationType()
                     .defaultValue(Duration.ofSeconds(1))
                     .withDescription(
-                            "the flush interval mills, over this time, 
asynchronous threads will flush data. The "
-                                    + "default value is 1s.");
+                            "The flush interval mills, over this time, 
asynchronous threads will flush data.");
     private static final ConfigOption<Integer> SINK_MAX_RETRIES =
             ConfigOptions.key("sink.max-retries")
                     .intType()
                     .defaultValue(3)
-                    .withDescription("the max retry times if writing records 
to database failed.");
+                    .withDescription("The max retry times if writing records 
to database failed.");
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
index 993af27..de8f451 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.description.Description;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
@@ -51,6 +52,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.configuration.description.TextElement.text;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.AT_LEAST_ONCE;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.EXACTLY_ONCE;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.NONE;
@@ -97,32 +99,39 @@ public class KafkaOptions {
                     .enumType(ValueFieldsStrategy.class)
                     .defaultValue(ValueFieldsStrategy.ALL)
                     .withDescription(
-                            "Defines a strategy how to deal with key columns 
in the data type of "
-                                    + "the value format. By default, '"
-                                    + ValueFieldsStrategy.ALL
-                                    + "' physical "
-                                    + "columns of the table schema will be 
included in the value format which "
-                                    + "means that key columns appear in the 
data type for both the key and value "
-                                    + "format.");
+                            String.format(
+                                    "Defines a strategy how to deal with key 
columns in the data type "
+                                            + "of the value format. By 
default, '%s' physical columns of the table schema "
+                                            + "will be included in the value 
format which means that the key columns "
+                                            + "appear in the data type for 
both the key and value format.",
+                                    ValueFieldsStrategy.ALL));
 
     public static final ConfigOption<String> KEY_FIELDS_PREFIX =
             ConfigOptions.key("key.fields-prefix")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Defines a custom prefix for all fields of the key 
format to avoid "
-                                    + "name clashes with fields of the value 
format. By default, the prefix is empty. "
-                                    + "If a custom prefix is defined, both the 
table schema and "
-                                    + "'"
-                                    + KEY_FIELDS.key()
-                                    + "' will work with prefixed names. When 
constructing "
-                                    + "the data type of the key format, the 
prefix will be removed and the "
-                                    + "non-prefixed names will be used within 
the key format. Please note that this "
-                                    + "option requires that '"
-                                    + VALUE_FIELDS_INCLUDE.key()
-                                    + "' must be '"
-                                    + ValueFieldsStrategy.EXCEPT_KEY
-                                    + "'.");
+                            Description.builder()
+                                    .text(
+                                            "Defines a custom prefix for all 
fields of the key format to avoid "
+                                                    + "name clashes with 
fields of the value format. "
+                                                    + "By default, the prefix 
is empty.")
+                                    .linebreak()
+                                    .text(
+                                            String.format(
+                                                    "If a custom prefix is 
defined, both the table schema and '%s' will work with prefixed names.",
+                                                    KEY_FIELDS.key()))
+                                    .linebreak()
+                                    .text(
+                                            "When constructing the data type 
of the key format, the prefix "
+                                                    + "will be removed and the 
non-prefixed names will be used within the key format.")
+                                    .linebreak()
+                                    .text(
+                                            String.format(
+                                                    "Please note that this 
option requires that '%s' must be '%s'.",
+                                                    VALUE_FIELDS_INCLUDE.key(),
+                                                    
ValueFieldsStrategy.EXCEPT_KEY))
+                                    .build());
 
     // 
--------------------------------------------------------------------------------------------
     // Kafka specific options
@@ -166,9 +175,16 @@ public class KafkaOptions {
                     .stringType()
                     .defaultValue("group-offsets")
                     .withDescription(
-                            "Optional startup mode for Kafka consumer, valid 
enumerations are "
-                                    + "\"earliest-offset\", \"latest-offset\", 
\"group-offsets\", \"timestamp\"\n"
-                                    + "or \"specific-offsets\"");
+                            Description.builder()
+                                    .text(
+                                            "Optional startup mode for Kafka 
consumer, valid enumerations are")
+                                    .list(
+                                            text("'earliest-offset'"),
+                                            text("'latest-offset'"),
+                                            text("'group-offsets'"),
+                                            text("'timestamp'"),
+                                            text("'specific-offsets'"))
+                                    .build());
 
     public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS =
             ConfigOptions.key("scan.startup.specific-offsets")
@@ -200,19 +216,30 @@ public class KafkaOptions {
                     .stringType()
                     .defaultValue("default")
                     .withDescription(
-                            "Optional output partitioning from Flink's 
partitions\n"
-                                    + "into Kafka's partitions valid 
enumerations are\n"
-                                    + "\"default\": (use kafka default 
partitioner to partition records),\n"
-                                    + "\"fixed\": (each Flink partition ends 
up in at most one Kafka partition),\n"
-                                    + "\"round-robin\": (a Flink partition is 
distributed to Kafka partitions round-robin when 'key.fields' is not 
specified.)\n"
-                                    + "\"custom class name\": (use a custom 
FlinkKafkaPartitioner subclass)");
+                            Description.builder()
+                                    .text(
+                                            "Optional output partitioning from 
Flink's partitions into Kafka's partitions. Valid enumerations are")
+                                    .list(
+                                            text(
+                                                    "'default' (use kafka 
default partitioner to partition records)"),
+                                            text(
+                                                    "'fixed' (each Flink 
partition ends up in at most one Kafka partition)"),
+                                            text(
+                                                    "'round-robin' (a Flink 
partition is distributed to Kafka partitions round-robin when 'key.fields' is 
not specified)"),
+                                            text(
+                                                    "custom class name (use 
custom FlinkKafkaPartitioner subclass)"))
+                                    .build());
 
     public static final ConfigOption<String> SINK_SEMANTIC =
             ConfigOptions.key("sink.semantic")
                     .stringType()
                     .defaultValue("at-least-once")
                     .withDescription(
-                            "Optional semantic when commit. Valid 
enumerationns are [\"at-least-once\", \"exactly-once\", \"none\"]");
+                            Description.builder()
+                                    .text(
+                                            "Optional semantic when 
committing. Valid enumerations are")
+                                    .list(text("at-least-once"), 
text("exactly-once"), text("none"))
+                                    .build());
 
     // Disable this feature by default
     public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
@@ -220,12 +247,19 @@ public class KafkaOptions {
                     .intType()
                     .defaultValue(0)
                     .withDescription(
-                            "The max size of buffered records before flush. "
-                                    + "When the sink receives many updates on 
the same key, the buffer will retain the last record of the same key. "
-                                    + "This can help to reduce data shuffling 
and avoid possible tombstone messages to Kafka topic."
-                                    + "Can be set to '0' to disable it."
-                                    + "Note both 'sink.buffer-flush.max-rows' 
and 'sink.buffer-flush.interval' "
-                                    + "must be set to be greater than zero to 
enable sink buffer flushing.");
+                            Description.builder()
+                                    .text(
+                                            "The max size of buffered records 
before flushing. "
+                                                    + "When the sink receives 
many updates on the same key, "
+                                                    + "the buffer will retain 
the last records of the same key. "
+                                                    + "This can help to reduce 
data shuffling and avoid possible tombstone messages to the Kafka topic.")
+                                    .linebreak()
+                                    .text("Can be set to '0' to disable it.")
+                                    .linebreak()
+                                    .text(
+                                            "Note both 
'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' "
+                                                    + "must be set to be 
greater than zero to enable sink buffer flushing.")
+                                    .build());
 
     // Disable this feature by default
     public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
@@ -233,12 +267,18 @@ public class KafkaOptions {
                     .durationType()
                     .defaultValue(Duration.ofSeconds(0))
                     .withDescription(
-                            "The flush interval mills, over this time, 
asynchronous threads will flush data. "
-                                    + "When the sink receives many updates on 
the same key, the buffer will retain the last record of the same key. "
-                                    + "This can help to reduce data shuffling 
and avoid possible tombstone messages to Kafka topic."
-                                    + "Can be set to '0' to disable it. "
-                                    + "Note both 'sink.buffer-flush.max-rows' 
and 'sink.buffer-flush.interval' "
-                                    + "must be set to be greater than zero to 
enable sink buffer flushing.");
+                            Description.builder()
+                                    .text(
+                                            "The flush interval millis. Over 
this time, asynchronous threads "
+                                                    + "will flush data. When 
the sink receives many updates on the same key, "
+                                                    + "the buffer will retain 
the last record of the same key.")
+                                    .linebreak()
+                                    .text("Can be set to '0' to disable it.")
+                                    .linebreak()
+                                    .text(
+                                            "Note both 
'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' "
+                                                    + "must be set to be 
greater than zero to enable sink buffer flushing.")
+                                    .build());
 
     private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT =
             
ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue();
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java
index 5e50a21..b30d401 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.description.Description;
 import org.apache.flink.streaming.connectors.kinesis.FixedKinesisPartitioner;
 import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
 import org.apache.flink.streaming.connectors.kinesis.RandomKinesisPartitioner;
@@ -40,6 +41,9 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
 /**
  * Options for Kinesis tables supported by the {@code CREATE TABLE ... WITH 
...} clause of the Flink
  * SQL dialect and the Flink Table API.
@@ -86,7 +90,7 @@ public class KinesisOptions {
             ConfigOptions.key("stream")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("Name of the Kinesis stream backing this 
table (required)");
+                    .withDescription("Name of the Kinesis stream backing this 
table.");
 
     // 
-----------------------------------------------------------------------------------------
     // Sink specific options
@@ -97,26 +101,33 @@ public class KinesisOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Optional output partitioning from Flink's 
partitions into Kinesis shards. "
-                                    + "Sinks that write to tables defined with 
the PARTITION BY clause "
-                                    + "always use a field-based partitioner 
and cannot define this option. "
-                                    + "Valid enumerations are: \n"
-                                    + "\"random\":"
-                                    + " (use a random partition key),\n"
-                                    + "\"fixed\":"
-                                    + " (each Flink partition ends up in at 
most one Kinesis shard),\n"
-                                    + "\"custom class name\":"
-                                    + " (use a custom "
-                                    + KinesisPartitioner.class.getName()
-                                    + " subclass)");
+                            Description.builder()
+                                    .text(
+                                            "Optional output partitioning from 
Flink's partitions into Kinesis shards. "
+                                                    + "Sinks that write to 
tables defined with the %s clause always use a "
+                                                    + "field-based partitioner 
and cannot define this option.",
+                                            code("PARTITION BY"))
+                                    .linebreak()
+                                    .text("Valid enumerations are")
+                                    .list(
+                                            text("random (use a random 
partition key)"),
+                                            text(
+                                                    "fixed (each Flink 
partition ends up in at most one Kinesis shard)"),
+                                            text(
+                                                    "custom class name (use a 
custom %s subclass)",
+                                                    
text(KinesisPartitioner.class.getName())))
+                                    .build());
 
     public static final ConfigOption<String> SINK_PARTITIONER_FIELD_DELIMITER =
             ConfigOptions.key("sink.partitioner-field-delimiter")
                     .stringType()
                     .defaultValue("|")
                     .withDescription(
-                            "Optional field delimiter for fields-based 
partitioner "
-                                    + "derived from a PARTITION BY clause 
(\"|\" by default)");
+                            Description.builder()
+                                    .text(
+                                            "Optional field delimiter for 
fields-based partitioner derived from a %s clause",
+                                            code("PARTITION BY"))
+                                    .build());
 
     // 
-----------------------------------------------------------------------------------------
     // Option enumerations
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
index 0f985d6..bfc17e4 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
@@ -20,11 +20,13 @@ package org.apache.flink.table.filesystem;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
 import org.apache.flink.table.factories.FactoryUtil;
 
 import java.time.Duration;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
 
 /** This class holds configuration constants used by filesystem(Including 
hive) connector. */
 public class FileSystemOptions {
@@ -38,14 +40,13 @@ public class FileSystemOptions {
                     .defaultValue("__DEFAULT_PARTITION__")
                     .withDescription(
                             "The default partition name in case the dynamic 
partition"
-                                    + " column value is null/empty string");
+                                    + " column value is null/empty string.");
 
     public static final ConfigOption<MemorySize> SINK_ROLLING_POLICY_FILE_SIZE 
=
             key("sink.rolling-policy.file-size")
                     .memoryType()
                     .defaultValue(MemorySize.ofMebiBytes(128))
-                    .withDescription(
-                            "The maximum part file size before rolling (by 
default 128MB).");
+                    .withDescription("The maximum part file size before 
rolling.");
 
     public static final ConfigOption<Duration> 
SINK_ROLLING_POLICY_ROLLOVER_INTERVAL =
             key("sink.rolling-policy.rollover-interval")
@@ -53,7 +54,7 @@ public class FileSystemOptions {
                     .defaultValue(Duration.ofMinutes(30))
                     .withDescription(
                             "The maximum time duration a part file can stay 
open before rolling"
-                                    + " (by default 30 min to avoid to many 
small files). The frequency at which"
+                                    + " (by default long enough to avoid too 
many small files). The frequency at which"
                                     + " this is checked is controlled by the 
'sink.rolling-policy.check-interval' option.");
 
     public static final ConfigOption<Duration> 
SINK_ROLLING_POLICY_CHECK_INTERVAL =
@@ -71,28 +72,34 @@ public class FileSystemOptions {
                     .withDescription(
                             "The option to enable shuffle data by dynamic 
partition fields in sink"
                                     + " phase, this can greatly reduce the 
number of file for filesystem sink but may"
-                                    + " lead data skew, the default value is 
disabled.");
+                                    + " lead data skew.");
 
     public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
             key("streaming-source.enable")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "Enable streaming source or not.\n"
-                                    + " NOTES: Please make sure that each 
partition/file should be written"
-                                    + " atomically, otherwise the reader may 
get incomplete data.");
+                            Description.builder()
+                                    .text("Enable streaming source or not.")
+                                    .linebreak()
+                                    .text(
+                                            " NOTES: Please make sure that 
each partition/file should be written"
+                                                    + " atomically, otherwise 
the reader may get incomplete data.")
+                                    .build());
 
     public static final ConfigOption<String> 
STREAMING_SOURCE_PARTITION_INCLUDE =
             key("streaming-source.partition.include")
                     .stringType()
                     .defaultValue("all")
                     .withDescription(
-                            "Option to set the partitions to read, the 
supported values "
-                                    + "are \"all\" and \"latest\","
-                                    + " the \"all\" means read all partitions; 
the \"latest\" means read latest "
-                                    + "partition in order of 
streaming-source.partition.order, the \"latest\" only works"
-                                    + " when the streaming hive source table 
used as temporal table. "
-                                    + "By default the option is \"all\".\n.");
+                            Description.builder()
+                                    .text(
+                                            "Option to set the partitions to 
read, supported values are")
+                                    .list(
+                                            text("all (read all partitions)"),
+                                            text(
+                                                    "latest (read latest 
partition in order of 'streaming-source.partition.order', this only works when 
a streaming Hive source table is used as a temporal table)"))
+                                    .build());
 
     public static final ConfigOption<Duration> 
STREAMING_SOURCE_MONITOR_INTERVAL =
             key("streaming-source.monitor-interval")
@@ -106,39 +113,47 @@ public class FileSystemOptions {
                     .defaultValue("partition-name")
                     .withDeprecatedKeys("streaming-source.consume-order")
                     .withDescription(
-                            "The partition order of streaming source,"
-                                    + " support \"create-time\", 
\"partition-time\" and \"partition-name\"."
-                                    + " \"create-time\" compares 
partition/file creation time, this is not the"
-                                    + " partition create time in Hive 
metaStore, but the folder/file modification"
-                                    + " time in filesystem, if the partition 
folder somehow gets updated,"
-                                    + " e.g. add new file into folder, it can 
affect how the data is consumed."
-                                    + " \"partition-time\" compares the time 
extracted from partition name."
-                                    + " \"partition-name\" compares partition 
name's alphabetical order."
-                                    + " This option is equality with 
deprecated option \"streaming-source.consume-order\".");
+                            Description.builder()
+                                    .text(
+                                            "The partition order of the 
streaming source, supported values are")
+                                    .list(
+                                            text(
+                                                    "create-time (compares 
partition/file creation time, which is not the partition creation time in the 
Hive metastore, "
+                                                            + "but the 
folder/file modification time in the filesystem; e.g., adding a new file into "
+                                                            + "the folder may 
affect how the data is consumed)"),
+                                            text(
+                                                    "partition-time (compares 
the time extracted from the partition name)"),
+                                            text(
+                                                    "partition-name (compares 
partition names lexicographically)"))
+                                    .text(
+                                            "This is a synonym for the 
deprecated 'streaming-source.consume-order' option.")
+                                    .build());
 
     public static final ConfigOption<String> 
STREAMING_SOURCE_CONSUME_START_OFFSET =
             key("streaming-source.consume-start-offset")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Start offset for streaming consuming."
-                                    + " How to parse and compare offsets 
depends on your order."
-                                    + " For create-time and partition-time, 
should be a timestamp"
-                                    + " string (yyyy-[m]m-[d]d [hh:mm:ss])."
-                                    + " For partition-time, will use partition 
time extractor to"
-                                    + " extract time from partition."
-                                    + " For partition-name, is the partition 
name string, e.g.:"
-                                    + " pt_year=2020/pt_mon=10/pt_day=01");
+                            Description.builder()
+                                    .text(
+                                            "Start offset for streaming 
consuming. How to parse and compare offsets depends on 
'streaming-source.partition-order'.")
+                                    .list(
+                                            text(
+                                                    "For 'create-time' and 
'partition-time' it should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss])."),
+                                            text(
+                                                    "For 'partition-time' it 
will use a partition time extractor to extract the time from the partition."),
+                                            text(
+                                                    "For 'partition-name' it 
is the name of the partition, e.g. 'pt_year=2020/pt_mon=10/pt_day=01'."))
+                                    .build());
 
     public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_KIND =
             key("partition.time-extractor.kind")
                     .stringType()
                     .defaultValue("default")
                     .withDescription(
-                            "Time extractor to extract time from partition 
values."
-                                    + " Support default and custom."
-                                    + " For default, can configure timestamp 
pattern."
-                                    + " For custom, should configure extractor 
class.");
+                            "Time extractor to extract time from partition 
values. "
+                                    + "This can either be 'default' or a 
custom extractor class. "
+                                    + "For 'default', you can configure a 
timestamp pattern.");
 
     public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_CLASS =
             key("partition.time-extractor.class")
@@ -152,43 +167,56 @@ public class FileSystemOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "The 'default' construction way allows users to 
use partition"
-                                    + " fields to get a legal timestamp 
pattern."
-                                    + " Default support 'yyyy-mm-dd hh:mm:ss' 
from first field."
-                                    + " If timestamp in partition is single 
field 'dt', can configure: '$dt'."
-                                    + " If timestamp in partition is year, 
month, day, hour,"
-                                    + " can configure: '$year-$month-$day 
$hour:00:00'."
-                                    + " If timestamp in partition is dt and 
hour, can configure: '$dt $hour:00:00'.");
+                            Description.builder()
+                                    .text(
+                                            "When 
'partition.time-extractor.kind' is set to 'default', "
+                                                    + "you can specify a 
pattern to get a timestamp from partitions.")
+                                    .list(
+                                            text(
+                                                    "By default, a format of 
'yyyy-mm-dd hh:mm:ss' is read from the first field."),
+                                            text(
+                                                    "If the timestamp in the 
partition is a single field called 'dt', you can use '$dt'."),
+                                            text(
+                                                    "If it is spread across 
multiple fields for year, month, day, and hour, you can use '$year-$month-$day 
$hour:00:00'."),
+                                            text(
+                                                    "If the timestamp is in 
fields dt and hour, you can use '$dt $hour:00:00'."))
+                                    .build());
 
     public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL =
             key("lookup.join.cache.ttl")
                     .durationType()
                     .defaultValue(Duration.ofMinutes(60))
                     .withDescription(
-                            "The cache TTL (e.g. 10min) for the build table in 
lookup join. "
-                                    + "By default the TTL is 60 minutes.");
+                            "The cache TTL (e.g. 10min) for the build table in 
lookup join.");
 
     public static final ConfigOption<String> SINK_PARTITION_COMMIT_TRIGGER =
             key("sink.partition-commit.trigger")
                     .stringType()
                     .defaultValue("process-time")
                     .withDescription(
-                            "Trigger type for partition commit:\n"
-                                    + " 'process-time': based on the time of 
the machine, it neither requires"
-                                    + " partition time extraction nor 
watermark generation. Commit partition"
-                                    + " once the 'current system time' passes 
'partition creation system time' plus 'delay'.\n"
-                                    + " 'partition-time': based on the time 
that extracted from partition values,"
-                                    + " it requires watermark generation. 
Commit partition once the 'watermark'"
-                                    + " passes 'time extracted from partition 
values' plus 'delay'.");
+                            Description.builder()
+                                    .text("Trigger type for partition commit, 
supported values are")
+                                    .list(
+                                            text(
+                                                    "process-time (based on 
the time of the machine, requires "
+                                                            + "neither 
partition time extraction nor watermark generation; "
+                                                            + "commits 
partition once the current system time passes partition creation system time 
plus delay)"),
+                                            text(
+                                                    "partition-time (based on 
the time extracted from partition values, "
+                                                            + "requires 
watermark generation; commits partition once "
+                                                            + "the watermark 
passes the time extracted from partition values plus delay)"))
+                                    .build());
 
     public static final ConfigOption<Duration> SINK_PARTITION_COMMIT_DELAY =
             key("sink.partition-commit.delay")
                     .durationType()
                     .defaultValue(Duration.ofMillis(0))
                     .withDescription(
-                            "The partition will not commit until the delay 
time."
-                                    + " if it is a day partition, should be '1 
d',"
-                                    + " if it is a hour partition, should be 
'1 h'");
+                            Description.builder()
+                                    .text(
+                                            "The partition will not commit 
until the delay time. "
+                                                    + "The value should be '1 
d' for day partitions and '1 h' for hour partitions.")
+                                    .build());
 
     public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE =
             key("sink.partition-commit.watermark-time-zone")

Reply via email to