twalthr commented on a change in pull request #16142:
URL: https://github.com/apache/flink/pull/16142#discussion_r657870086



##########
File path: docs/content/docs/connectors/table/kafka.md
##########
@@ -193,10 +193,10 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required for sink</td>
+      <td>optional</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name(s) to read data from when the table is used as source. It 
also supports topic list for source by separating topic by semicolon like 
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. When the table is used as sink, the topic name is 
the topic to write data to. Note topic list is not supported for sinks.</td>
+      <td>Topic name(s) to read data from when the table is used as source. It 
also supports topic list for source by separating topic by semicolon like 
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. When the table is used as sink, the topic name is 
the topic to write data. If the "topic" option isn't specified for sink table, 
"topic" metadata column must be specified. If the "topic" option is specified 
for sink table, "topic" metadata column must be readable. Note topic list is 
not supported for sinks.</td>
     </tr>
     <tr>

Review comment:
       I wasn't aware that we support lists already in the topic option. 
Wouldn't it be useful to support this option to limit the number of topic 
available in the dynamic metadata column. I think this is a very common use 
case, because data platform teams usually only allow a set of topics to be 
writable.
   
   btw `"topic" metadata column must be readable` -> `"topic" metadata column 
can only be readable`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -145,7 +145,7 @@ public KafkaDynamicSink(
         // Mutable attributes
         this.metadataKeys = Collections.emptyList();
         // Kafka-specific attributes
-        this.topic = checkNotNull(topic, "Topic must not be null.");
+        this.topic = topic;

Review comment:
       mark nullability in parameter using `@Nullable`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -216,7 +216,7 @@
     protected final Properties producerConfig;
 
     /** The name of the default topic this producer is writing data to. */
-    protected final String defaultTopicId;
+    protected String defaultTopicId;

Review comment:
       In any case we should make the parameter as `@Nullable` and update the 
JavaDocs

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -166,7 +167,16 @@ public void setPartitions(int[] partitions) {
 
     @Override
     public String getTargetTopic(RowData element) {
-        return topic;
+        if (!StringUtils.isNullOrWhitespaceOnly(topic)) {

Review comment:
       this is a very expensive operation that is executed for every record in 
the hot path, I don't think if we need this whitespace logic at all and can 
rely on the user to specify it correctly

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -166,7 +167,16 @@ public void setPartitions(int[] partitions) {
 
     @Override
     public String getTargetTopic(RowData element) {
-        return topic;
+        if (!StringUtils.isNullOrWhitespaceOnly(topic)) {
+            return topic;
+        }
+        // no explicit topic defined in table options, use the writable 
metadata column topic
+        final String topicMetadata = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+        if (StringUtils.isNullOrWhitespaceOnly(topicMetadata)) {
+            throw new NullPointerException(

Review comment:
       use a different kind of exception, maybe `RuntimeException`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -224,7 +224,7 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
                 keyProjection,
                 valueProjection,
                 keyPrefix,
-                tableOptions.get(TOPIC).get(0),
+                tableOptions.getOptional(TOPIC).isPresent() ? 
tableOptions.get(TOPIC).get(0) : null,

Review comment:
       this could cause a index out of bounds exception if the list is empty?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -41,7 +42,7 @@
 
     private final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
 
-    private final String topic;
+    private String topic;

Review comment:
       nit: why is it not final anymore? I cannot see another mutation in your 
changes.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -355,6 +356,21 @@ public int hashCode() {
     // 
--------------------------------------------------------------------------------------------
 
     enum WritableMetadata {
+        TOPIC(
+                "topic",
+                DataTypes.STRING().nullable(),

Review comment:
       shouldn't it be notNull if we throw an exception during runtime 
otherwise?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -144,7 +144,7 @@ private KafkaOptions() {}
                     .noDefaultValue()
                     .withDescription(
                             "Topic names from which the table is read. Either 
'topic' or 'topic-pattern' must be set for source. "
-                                    + "Option 'topic' is required for sink.");
+                                    + "Option 'topic' is optional for sink 
through specifying the 'topic' metadata column.");

Review comment:
       This still seems to be different from the docs. Please generate the docs 
from the options using `ConfigOptionDocGenerator`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -216,7 +216,7 @@
     protected final Properties producerConfig;
 
     /** The name of the default topic this producer is writing data to. */
-    protected final String defaultTopicId;
+    protected String defaultTopicId;

Review comment:
       @AHeise you are more familiar with the Kafka producer? Is this a valid 
change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to