This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0c34c994f9906e58963f85739fc951221b11d26a Author: slinkydeveloper <[email protected]> AuthorDate: Thu Jan 6 16:20:12 2022 +0100 [FLINK-25391][connector-kafka] Forward catalog table options --- docs/content/docs/connectors/table/kafka.md | 24 +++++++++++++++++++++- .../kafka/table/KafkaDynamicTableFactory.java | 23 +++++++++++++-------- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/docs/content/docs/connectors/table/kafka.md b/docs/content/docs/connectors/table/kafka.md index 6b728f6..34d73c4 100644 --- a/docs/content/docs/connectors/table/kafka.md +++ b/docs/content/docs/connectors/table/kafka.md @@ -179,15 +179,17 @@ Connector Options <tr> <th class="text-left" style="width: 25%">Option</th> <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 8%">Forwarded</th> <th class="text-center" style="width: 7%">Default</th> <th class="text-center" style="width: 10%">Type</th> - <th class="text-center" style="width: 50%">Description</th> + <th class="text-center" style="width: 42%">Description</th> </tr> </thead> <tbody> <tr> <td><h5>connector</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Specify what connector to use, for Kafka use <code>'kafka'</code>.</td> @@ -195,6 +197,7 @@ Connector Options <tr> <td><h5>topic</h5></td> <td>required for sink</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like <code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.</td> @@ -202,6 +205,7 @@ Connector Options <tr> <td><h5>topic-pattern</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.</td> @@ -209,6 +213,7 @@ Connector Options <tr> <td><h5>properties.bootstrap.servers</h5></td> <td>required</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Comma separated list of Kafka brokers.</td> @@ -216,6 +221,7 @@ Connector Options <tr> <td><h5>properties.group.id</h5></td> <td>optional for source, not applicable for sink</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The id of the consumer group for Kafka source. If group ID is not specified, an automatically generated id "KafkaSource-{tableIdentifier}" will be used.</td> @@ -223,6 +229,7 @@ Connector Options <tr> <td><h5>properties.*</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td> @@ -232,6 +239,7 @@ Connector Options <tr> <td><h5>format</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The format used to deserialize and serialize the value part of Kafka messages. @@ -243,6 +251,7 @@ Connector Options <tr> <td><h5>key.format</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The format used to deserialize and serialize the key part of Kafka messages. @@ -254,6 +263,7 @@ Connector Options <tr> <td><h5>key.fields</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">[]</td> <td>List<String></td> <td>Defines an explicit list of physical columns from the table schema that configure the data @@ -264,6 +274,7 @@ Connector Options <tr> <td><h5>key.fields-prefix</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Defines a custom prefix for all fields of the key format to avoid name clashes with fields @@ -277,6 +288,7 @@ Connector Options <tr> <td><h5>value.format</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The format used to deserialize and serialize the value part of Kafka messages. @@ -288,6 +300,7 @@ Connector Options <tr> <td><h5>value.fields-include</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">ALL</td> <td><p>Enum</p>Possible values: [ALL, EXCEPT_KEY]</td> <td>Defines a strategy how to deal with key columns in the data type of the value format. By @@ -298,6 +311,7 @@ Connector Options <tr> <td><h5>scan.startup.mode</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">group-offsets</td> <td>String</td> <td>Startup mode for Kafka consumer, valid values are <code>'earliest-offset'</code>, <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>. @@ -306,6 +320,7 @@ Connector Options <tr> <td><h5>scan.startup.specific-offsets</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Specify offsets for each partition in case of <code>'specific-offsets'</code> startup mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'</code>. @@ -314,6 +329,7 @@ Connector Options <tr> <td><h5>scan.startup.timestamp-millis</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Long</td> <td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td> @@ -321,6 +337,7 @@ Connector Options <tr> <td><h5>scan.topic-partition-discovery.interval</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Duration</td> <td>Interval for consumer to discover dynamically created Kafka topics and partitions periodically.</td> @@ -328,6 +345,7 @@ Connector Options <tr> <td><h5>sink.partitioner</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">'default'</td> <td>String</td> <td>Output partitioning from Flink's partitions into Kafka's partitions. Valid values are @@ -343,6 +361,7 @@ Connector Options <tr> <td><h5>sink.semantic</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">at-least-once</td> <td>String</td> <td>Deprecated: Please use <code>sink.delivery-guarantee</code>.</td> @@ -350,6 +369,7 @@ Connector Options <tr> <td><h5>sink.delivery-guarantee</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">at-least-once</td> <td>String</td> <td>Defines the delivery semantic for the Kafka sink. Valid enumerationns are <code>'at-least-once'</code>, <code>'exactly-once'</code> and <code>'none'</code>. See <a href='#consistency-guarantees'>Consistency guarantees</a> for more details. </td> @@ -357,6 +377,7 @@ Connector Options <tr> <td><h5>sink.transactional-id-prefix</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>If the delivery guarantee is configured as <code>'exactly-once'</code> this value must be set and is used a prefix for the identifier of all opened Kafka transactions.</td> @@ -364,6 +385,7 @@ Connector Options <tr> <td><h5>sink.parallelism</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td> diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 290f396..5ac146d 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -60,6 +60,8 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; @@ -146,14 +148,19 @@ public class KafkaDynamicTableFactory @Override public Set<ConfigOption<?>> forwardOptions() { - final Set<ConfigOption<?>> options = new HashSet<>(); - options.add(TOPIC); - options.add(TOPIC_PATTERN); - options.add(SCAN_STARTUP_MODE); - options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); - options.add(SCAN_TOPIC_PARTITION_DISCOVERY); - options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); - return options; + return Stream.of( + PROPS_BOOTSTRAP_SERVERS, + PROPS_GROUP_ID, + TOPIC, + TOPIC_PATTERN, + SCAN_STARTUP_MODE, + SCAN_STARTUP_SPECIFIC_OFFSETS, + SCAN_TOPIC_PARTITION_DISCOVERY, + SCAN_STARTUP_TIMESTAMP_MILLIS, + SINK_PARTITIONER, + SINK_PARALLELISM, + TRANSACTIONAL_ID_PREFIX) + .collect(Collectors.toSet()); } @Override
