Airblader commented on a change in pull request #16142:
URL: https://github.com/apache/flink/pull/16142#discussion_r656189770



##########
File path: docs/content/docs/connectors/table/kafka.md
##########
@@ -193,10 +193,10 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required for sink</td>
+      <td>optional</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name(s) to read data from when the table is used as source. It 
also supports topic list for source by separating topic by semicolon like 
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. When the table is used as sink, the topic name is 
the topic to write data to. Note topic list is not supported for sinks.</td>
+      <td>Topic name(s) to read data from when the table is used as source. It 
also supports topic list for source by separating topic by semicolon like 
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. When the table is used as sink, the topic name is 
the topic to write data. If the "topic" option isn't specified for sink table, 
"topic" metadata column must be specified. If the "topic" option is specified 
for sink table, "topic" metadata column must be readable. Note topic list is 
not supported for sinks.</td>

Review comment:
       Please also update the documentation of `KafkaOptions#TOPIC` since the 
`ConfigOption` should be the source of truth for documentation.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -216,7 +216,7 @@
     protected final Properties producerConfig;
 
     /** The name of the default topic this producer is writing data to. */
-    protected final String defaultTopicId;
+    protected String defaultTopicId;

Review comment:
       Why can't this be `final` anymore? It's still entirely initialised 
within the constructor, isn't it?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -166,6 +167,15 @@ public void setPartitions(int[] partitions) {
 
     @Override
     public String getTargetTopic(RowData element) {
+        // no explicit topic defined in table options, use the writable 
metadata column topic
+        topic =

Review comment:
       What happens / should happen if the topic returned by metadata differs 
between different elements? We're now essentially reading the metadata of 
whichever element comes first and then ignore it forever, right? That isn't the 
behavior I would expect given the documentation.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -141,7 +142,7 @@ public void open(SerializationSchema.InitializationContext 
context) throws Excep
         }
 
         return new ProducerRecord<>(
-                topic,
+                getTargetTopic(consumedRow),

Review comment:
       Wouldn't this also be necessary in the shortcut path at the beginning of 
this method?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
##########
@@ -300,6 +306,240 @@ public void testKafkaTableWithMultipleTopics() throws 
Exception {
         topics.forEach(super::deleteTestTopic);
     }
 
+    @Test
+    public void testKafkaSinkWithMetadataIncludeTopicOption() {
+        if (isLegacyConnector) {
+            return;
+        }
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic = "metadata_topic_" + format;
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        String groupId = getStandardProps().getProperty("group.id");
+        String bootstraps = getBootstrapServers();
+
+        final String createTable =
+                String.format(
+                        "CREATE TABLE kafka (\n"
+                                + "  `physical_1` STRING,\n"
+                                + "  `physical_2` INT,\n"
+                                // metadata fields are out of order on purpose
+                                // offset is ignored because it might not be 
deterministic
+                                + "  `timestamp-type` STRING METADATA 
VIRTUAL,\n"
+                                + "  `timestamp` TIMESTAMP(3) METADATA,\n"
+                                + "  `leader-epoch` INT METADATA VIRTUAL,\n"
+                                + "  `headers` MAP<STRING, BYTES> METADATA,\n"
+                                + "  `partition` INT METADATA VIRTUAL,\n"
+                                + "  `topic` STRING METADATA,\n"
+                                + "  `physical_3` BOOLEAN\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'properties.group.id' = '%s',\n"
+                                + "  'scan.startup.mode' = 
'earliest-offset',\n"
+                                + "  %s\n"
+                                + ")",
+                        topic, bootstraps, groupId, formatOptions());
+
+        tEnv.executeSql(createTable);
+
+        String initialValues =
+                String.format(
+                        "INSERT INTO kafka\n"
+                                + "VALUES\n"
+                                + " ('data 1', 1, TIMESTAMP '2020-03-08 
13:12:11.123', MAP['k1', X'C0FFEE', 'k2', X'BABE'], '%s', TRUE),\n"
+                                + " ('data 2', 2, TIMESTAMP '2020-03-09 
13:12:11.123', CAST(NULL AS MAP<STRING, BYTES>), '%s', FALSE),\n"
+                                + " ('data 3', 3, TIMESTAMP '2020-03-10 
13:12:11.123', MAP['k1', X'10', 'k2', X'20'], '%s', TRUE)",
+                        topic, topic, topic);
+        try {
+            tEnv.executeSql(initialValues).await();
+            fail(
+                    "Unable to create the Kafka sink table with table option 
'topic' and metadata column 'topic'.");
+        } catch (Exception e) {
+            assertTrue(e instanceof ValidationException);
+            assertEquals(
+                    String.format(
+                            "Invalid metadata key '%s' in column 'topic' of 
table 'default_catalog.default_database.kafka'. "
+                                    + "The %s class '%s' supports the 
following metadata keys for writing:\n%s",
+                            TOPIC.key,
+                            DynamicTableSink.class.getSimpleName(),
+                            KafkaDynamicSink.class.getName(),
+                            String.join("\n", Arrays.asList(HEADERS.key, 
TIMESTAMP.key))),
+                    e.getMessage());
+        }
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic);

Review comment:
       We should make sure this will always be called as soon as 
`createTestTopic` has been called. Right now there's a few things that could 
throw exceptions in between and would prevent this from being called.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -361,18 +360,25 @@ public static void validateSourceTopic(ReadableConfig 
tableOptions) {
     }
 
     public static void validateSinkTopic(ReadableConfig tableOptions) {
-        String errorMessageTemp =
-                "Flink Kafka sink currently only supports single topic, but 
got %s: %s.";
-        if (!isSingleTopic(tableOptions)) {
-            if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
+        if (tableOptions.getOptional(TOPIC).isPresent()) {
+            if (tableOptions.get(TOPIC).isEmpty()) {
                 throw new ValidationException(
                         String.format(
-                                errorMessageTemp,
-                                "'topic-pattern'",
-                                tableOptions.get(TOPIC_PATTERN)));
-            } else {
-                throw new ValidationException(
-                        String.format(errorMessageTemp, "'topic'", 
tableOptions.get(TOPIC)));
+                                "Flink Kafka sink doesn't support empty topic, 
but got %s: %s.",
+                                "'topic'", tableOptions.get(TOPIC)));

Review comment:
       nit: it isn't really necessary to call `tableOptions.get(TOPIC)` here 
again since this will be called exactly when the list is empty.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -187,6 +188,9 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
+        // validate the table option 'topic' and 'topic-pattern'
+        
validateSinkTopic(Configuration.fromMap(context.getCatalogTable().getOptions()));

Review comment:
       Use `helper.getOptions()` (already stored in a variable `tableOptions` 
below) instead of building another `Configuration` here? Is there a reason to 
have this on the very top and not e.g. where `validateTableSinkOptions` is 
called?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -166,6 +167,15 @@ public void setPartitions(int[] partitions) {
 
     @Override
     public String getTargetTopic(RowData element) {
+        // no explicit topic defined in table options, use the writable 
metadata column topic
+        topic =
+                StringUtils.isNullOrWhitespaceOnly(topic)

Review comment:
       I think it would be better if we ensure that whitespace-only is mapped 
to null before it is assigned to `topic` rather than checking it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to