rich-c-shop commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1694043742


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java:
##########
@@ -251,6 +251,8 @@ public DataStreamSink<?> consumeDataStream(
     public Map<String, DataType> listWritableMetadata() {
         final Map<String, DataType> metadataMap = new LinkedHashMap<>();
         Stream.of(WritableMetadata.values())
+                // When `topic` is a singleton list, TOPIC metadata is not 
writable

Review Comment:
   will there be any warning or error message? looks like it just silently 
ignore TOPIC metadata when `topic` is a singleton list



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##########
@@ -144,14 +147,36 @@ public void open(
         valueSerialization.open(context);
     }
 
+    private String getTargetTopic(RowData element) {
+        if (topics != null && topics.size() == 1) {
+            // If topics is a singleton list, we only return the provided 
topic.
+            return topics.get(0);
+        }
+        final String topic = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+        if (topic == null && topics == null) {

Review Comment:
   how about the case `topic == null && topics != null && topics.size() == 0`?



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java:
##########
@@ -118,8 +118,8 @@ public class KafkaConnectorOptions {
                     .asList()
                     .noDefaultValue()
                     .withDescription(
-                            "Topic names from which the table is read. Either 
'topic' or 'topic-pattern' must be set for source. "
-                                    + "Option 'topic' is required for sink.");
+                            "Topic name(s) to read data from when the table is 
used as source. It also supports topic list for source by separating topic by 
semicolon like 'topic-1;topic-2'. Note, only one of 'topic-pattern' and 'topic' 
can be specified for sources. "
+                                    + "When the table is used as sink, the 
topic name is the topic to write data. It also supports topic list for sinks. 
The provided topic-list is treated as a allow list of valid values for the 
`topic` metadata column, if not provided, any value is valid. If the 'topic' 
option isn't specified, or a list is provided, for sink table, 'topic' metadata 
column is writable and must be specified.");

Review Comment:
   > if not provided, any value is valid
   
   how about supporting `topic-pattern` in sink as well? This would align with 
source and `*` wildcard pattern stands for "any value is valid".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to