twalthr commented on a change in pull request #15764:
URL: https://github.com/apache/flink/pull/15764#discussion_r620322217



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -97,32 +99,38 @@ private KafkaOptions() {}
                     .enumType(ValueFieldsStrategy.class)
                     .defaultValue(ValueFieldsStrategy.ALL)
                     .withDescription(
-                            "Defines a strategy how to deal with key columns 
in the data type of "
-                                    + "the value format. By default, '"
-                                    + ValueFieldsStrategy.ALL
-                                    + "' physical "
-                                    + "columns of the table schema will be 
included in the value format which "
-                                    + "means that key columns appear in the 
data type for both the key and value "
-                                    + "format.");
+                            String.format(
+                                    "Defines a strategy how to deal with key 
columns in the data type "
+                                            + "of the value format. By 
default, '%s' physical columns of the table schema "
+                                            + "will be included in the value 
format which means that the key columns "
+                                            + "appear in the data type for 
both the key and value format.",
+                                    ValueFieldsStrategy.ALL));
 
     public static final ConfigOption<String> KEY_FIELDS_PREFIX =
             ConfigOptions.key("key.fields-prefix")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Defines a custom prefix for all fields of the key 
format to avoid "
-                                    + "name clashes with fields of the value 
format. By default, the prefix is empty. "
-                                    + "If a custom prefix is defined, both the 
table schema and "
-                                    + "'"
-                                    + KEY_FIELDS.key()
-                                    + "' will work with prefixed names. When 
constructing "
-                                    + "the data type of the key format, the 
prefix will be removed and the "
-                                    + "non-prefixed names will be used within 
the key format. Please note that this "
-                                    + "option requires that '"
-                                    + VALUE_FIELDS_INCLUDE.key()
-                                    + "' must be '"
-                                    + ValueFieldsStrategy.EXCEPT_KEY
-                                    + "'.");
+                            Description.builder()
+                                    .text(
+                                            "Defines a custom prefix for all 
fields of the key format to avoid "

Review comment:
       I would keep `By default, the prefix is empty.` here. Because this is 
not obvious because there is no default value defined.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -200,45 +215,69 @@ private KafkaOptions() {}
                     .stringType()
                     .defaultValue("default")
                     .withDescription(
-                            "Optional output partitioning from Flink's 
partitions\n"
-                                    + "into Kafka's partitions valid 
enumerations are\n"
-                                    + "\"default\": (use kafka default 
partitioner to partition records),\n"
-                                    + "\"fixed\": (each Flink partition ends 
up in at most one Kafka partition),\n"
-                                    + "\"round-robin\": (a Flink partition is 
distributed to Kafka partitions round-robin when 'key.fields' is not 
specified.)\n"
-                                    + "\"custom class name\": (use a custom 
FlinkKafkaPartitioner subclass)");
+                            Description.builder()
+                                    .text(
+                                            "Optional output partitioning from 
Flink's partitions into Kafka's partitions. Valid enumerations are")
+                                    .list(
+                                            text(
+                                                    "default (use kafka 
default partitioner to partition records)"),

Review comment:
       maybe still surround it by '...' here. and below

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -166,9 +174,16 @@ private KafkaOptions() {}
                     .stringType()
                     .defaultValue("group-offsets")
                     .withDescription(
-                            "Optional startup mode for Kafka consumer, valid 
enumerations are "
-                                    + "\"earliest-offset\", \"latest-offset\", 
\"group-offsets\", \"timestamp\"\n"
-                                    + "or \"specific-offsets\"");
+                            Description.builder()
+                                    .text(
+                                            "Optional startup mode for Kafka 
consumer, valid enumerations are")
+                                    .list(
+                                            text("earliest-offset"),

Review comment:
       maybe still surround it by `'...'`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to