AHeise commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1728475076


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java:
##########
@@ -188,6 +188,120 @@ public void testKafkaSourceSink() throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testKafkaSourceSinkWithTopicList() throws Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic1 = "topics1_" + format + "_" + UUID.randomUUID();
+        final String topic2 = "topics2_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic2, 1, 1);
+        createTestTopic(topic1, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        String groupId = getStandardProps().getProperty("group.id");
+        String bootstraps = getBootstrapServers();
+        final String createTable =
+                String.format(
+                        "CREATE TABLE kafka (\n"
+                                + "  `topic` STRING METADATA,\n"
+                                + "  `user_id` INT,\n"
+                                + "  `item_id` INT,\n"
+                                + "  `behavior` STRING\n"
+                                + ") WITH (\n"
+                                + "  'connector' = '%s',\n"
+                                + "  'topic' = '%s;%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'properties.group.id' = '%s',\n"
+                                + "  'scan.startup.mode' = 
'earliest-offset',\n"
+                                + "  %s\n"
+                                + ")\n",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        topic1,
+                        topic2,
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+
+        tEnv.executeSql(createTable);
+
+        List<Row> values =
+                Arrays.asList(
+                        Row.of(topic1, 1, 1102, "behavior 1"),
+                        Row.of(topic2, 2, 1103, "behavior 2"));
+        tEnv.fromValues(values).insertInto("kafka").execute().await();
+
+        // ---------- Consume stream from Kafka -------------------
+
+        List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
kafka"));
+
+        assertThat(results)
+                .containsExactly(
+                        Row.of(topic1, 1, 1102, "behavior 1"),
+                        Row.of(topic2, 2, 1103, "behavior 2"));

Review Comment:
   This assertion is too unspecific for me. Records could be written into only 
one of the topics.
   
   I'd add another more fine-grain assertion with `drainAllRecordsFromTopic` 
where you really check that the records are written to the respective topics 
correctly.
   
   Btw for this assertion you could also simply 
`assertThat(results).containsExactlyInAnyOrder(values)`. Also note the 
`InAnyOrder` because we have no guarantee that the order is maintained.



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java:
##########
@@ -636,21 +618,25 @@ public static DynamicTableFactory.Context 
autoCompleteSchemaRegistrySubject(
     private static Map<String, String> autoCompleteSchemaRegistrySubject(
             Map<String, String> options) {
         Configuration configuration = Configuration.fromMap(options);
-        // the subject autoComplete should only be used in sink, check the 
topic first
-        validateSinkTopic(configuration);
-        final Optional<String> valueFormat = 
configuration.getOptional(VALUE_FORMAT);
-        final Optional<String> keyFormat = 
configuration.getOptional(KEY_FORMAT);
-        final Optional<String> format = configuration.getOptional(FORMAT);
-        final String topic = configuration.get(TOPIC).get(0);
-
-        if (format.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
-            autoCompleteSubject(configuration, format.get(), topic + "-value");
-        } else if (valueFormat.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
-            autoCompleteSubject(configuration, "value." + valueFormat.get(), 
topic + "-value");
-        }
+        // the subject autoComplete should only be used in sink with a single 
topic, check the topic

Review Comment:
   Okay, in this case, we probably need to dynamically autoComplete and I can 
see that this is a bigger change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to