wuchong commented on a change in pull request #12908:
URL: https://github.com/apache/flink/pull/12908#discussion_r460723655



##########
File path: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
##########
@@ -73,17 +80,26 @@ public Kafka010DynamicSource(
 
        @Override
        protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-                       String topic,
+                       List<String> topics,
                        Properties properties,
                        DeserializationSchema<RowData> deserializationSchema) {
-               return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
+               return new FlinkKafkaConsumer010<>(topics, 
deserializationSchema, properties);
+       }
+
+       @Override
+       protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+               Pattern pattern,
+               Properties properties,
+               DeserializationSchema<RowData> deserializationSchema) {

Review comment:
       Indent.

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -161,11 +176,22 @@ private KafkaOptions() {}
        // 
--------------------------------------------------------------------------------------------
 
        public static void validateTableOptions(ReadableConfig tableOptions) {
+               validateTopic(tableOptions);
                validateScanStartupMode(tableOptions);
                validateSinkPartitioner(tableOptions);
                validateSinkSemantic(tableOptions);
        }
 
+       public static void validateTopic(ReadableConfig tableOptions) {
+               Optional<String> topic = tableOptions.getOptional(TOPIC);
+               Optional<String> pattern = 
tableOptions.getOptional(TOPIC_PATTERN);
+
+               if ((topic.isPresent() && pattern.isPresent()) || 
!(topic.isPresent() || pattern.isPresent())) {

Review comment:
       Split this into two exceptions. 

##########
File path: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
##########
@@ -73,17 +80,26 @@ public Kafka010DynamicSource(
 
        @Override
        protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-                       String topic,
+                       List<String> topics,
                        Properties properties,
                        DeserializationSchema<RowData> deserializationSchema) {
-               return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
+               return new FlinkKafkaConsumer010<>(topics, 
deserializationSchema, properties);
+       }
+
+       @Override
+       protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+               Pattern pattern,

Review comment:
       topicPattern

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
##########
@@ -164,34 +178,44 @@ public int hashCode() {
        /**
         * Creates a version-specific Kafka consumer.
         *
-        * @param topic                 Kafka topic to consume.
+        * @param topics                Kafka topic to consume.
         * @param properties            Properties for the Kafka consumer.
         * @param deserializationSchema Deserialization schema to use for Kafka 
records.
         * @return The version-specific Kafka consumer
         */
        protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-                       String topic,
+                       List<String> topics,
                        Properties properties,
                        DeserializationSchema<RowData> deserializationSchema);
 
+       /**
+        * Creates a version-specific Kafka consumer.
+        *
+        * @param topicPattern          afka topic to consume.
+        * @param properties            Properties for the Kafka consumer.
+        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
+        * @return The version-specific Kafka consumer
+        */
+       protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+               Pattern topicPattern,
+               Properties properties,
+               DeserializationSchema<RowData> deserializationSchema);

Review comment:
       Indent.

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
##########
@@ -93,16 +101,21 @@
         *                               mode is {@link StartupMode#TIMESTAMP}.
         */
        protected KafkaDynamicSourceBase(
-                       DataType outputDataType,
-                       String topic,
-                       Properties properties,
-                       DecodingFormat<DeserializationSchema<RowData>> 
decodingFormat,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets,
-                       long startupTimestampMillis) {
+               DataType outputDataType,
+               @Nullable List<String> topics,
+               @Nullable Pattern topicPattern,
+               Properties properties,
+               DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
+               StartupMode startupMode,
+               Map<KafkaTopicPartition, Long> specificStartupOffsets,
+               long startupTimestampMillis) {

Review comment:
       Indent.

##########
File path: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSource.java
##########
@@ -73,17 +80,26 @@ public Kafka011DynamicSource(
 
        @Override
        protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-                       String topic,
+                       List<String> topics,
                        Properties properties,
                        DeserializationSchema<RowData> deserializationSchema) {
-               return new FlinkKafkaConsumer011<>(topic, 
deserializationSchema, properties);
+               return new FlinkKafkaConsumer011<>(topics, 
deserializationSchema, properties);
+       }
+
+       @Override
+       protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+               Pattern topicPattern,
+               Properties properties,
+               DeserializationSchema<RowData> deserializationSchema) {

Review comment:
       Indent.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -70,17 +77,26 @@ public KafkaDynamicSource(
 
        @Override
        protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-                       String topic,
+                       List<String> topics,
                        Properties properties,
                        DeserializationSchema<RowData> deserializationSchema) {
-               return new FlinkKafkaConsumer<>(topic, deserializationSchema, 
properties);
+               return new FlinkKafkaConsumer<>(topics, deserializationSchema, 
properties);
+       }
+
+       @Override
+       protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+               Pattern topicPattern,
+               Properties properties,
+               DeserializationSchema<RowData> deserializationSchema) {

Review comment:
       Indent.

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead 
if not set)</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name from which the table is read.</td>
+      <td>Topic name(s) from which the table is read. It also supports topic 
list for source by separating topic by comma like <code>'topic-1, 
topic-2'</code>.</td>
     </tr>
+    <tr>
+      <td><h5>topic-pattern</h5></td>
+      <td>optional for source(use 'topic' instead if not set)</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic pattern from which the table is read. It will use input value 
to build regex expression to discover matched topics.</td>

Review comment:
       The regular expression for a pattern of topic names to read from. All 
topics with names that match the specified regular expression will be 
subscribed by the consumer when the job starts running. Note, only one of 
"topic-pattern" and "topic" can be specified for sources. 

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -109,10 +133,18 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
                // Validate the option values.
                validateTableOptions(tableOptions);
 
+               if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()){
+                       throw new ValidationException("Flink Kafka sink 
currently doesn't support 'topic-pattern'.");
+               }
+               String[] topics = tableOptions.get(TOPIC).split(",");
+               if (topics.length > 1) {
+                       throw new ValidationException("Flink Kafka sink 
currently doesn't support topic list.");
+               }

Review comment:
       What about to have a `validateTableSinkOptions` and 
`validateTableSourceOptions` ? We can then move this validation to 
`validateSinkTopic()`.

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -193,6 +219,9 @@ private static void validateScanStartupMode(ReadableConfig 
tableOptions) {
                                                                        
SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
                                                                        
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
                                                }
+                                               if 
(!tableOptions.getOptional(TOPIC).isPresent() || 
tableOptions.get(TOPIC).split(",").length > 1){

Review comment:
       Add a util method `boolean isSingleTopic(ReadableConfig)`. It can also 
be used in sink side. 
   

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead 
if not set)</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name from which the table is read.</td>
+      <td>Topic name(s) from which the table is read. It also supports topic 
list for source by separating topic by comma like <code>'topic-1, 
topic-2'</code>.</td>
     </tr>
+    <tr>
+      <td><h5>topic-pattern</h5></td>
+      <td>optional for source(use 'topic' instead if not set)</td>

Review comment:
       We can simplify this `optional`.

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -187,6 +197,62 @@ public void testTableSourceCommitOnCheckpointsDisabled() {
                assertFalse(((FlinkKafkaConsumerBase) 
function).getEnableCommitOnCheckpoints());
        }
 
+       @Test
+       public void testTableSourceWithPattern() {
+               // prepare parameters for Kafka table source
+               final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
+
+               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+
+               DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+                       new TestFormatFactory.DecodingFormatMock(",", true);
+
+               // Construct table source using options and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "scanTable");
+
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                       getFullSourceOptions(),
+                       options -> {
+                               options.remove("topic");
+                               options.put("topic-pattern", TOPIC_REGEX);
+                               options.put("scan.startup.mode", 
KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST);
+                               options.remove("scan.startup.specific-offsets");
+                       });
+               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
+
+               final DynamicTableSource actualSource = 
FactoryUtil.createTableSource(null,
+                       objectIdentifier,
+                       catalogTable,
+                       new Configuration(),
+                       Thread.currentThread().getContextClassLoader());
+
+               // Test scan source equals
+               final KafkaDynamicSourceBase expectedKafkaSource = 
getExpectedScanSource(
+                       producedDataType,
+                       null,
+                       Pattern.compile(TOPIC_REGEX),
+                       KAFKA_PROPERTIES,
+                       decodingFormat,
+                       StartupMode.EARLIEST,
+                       specificOffsets,

Review comment:
       Better to use `new HashMap<>()`.

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead 
if not set)</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name from which the table is read.</td>
+      <td>Topic name(s) from which the table is read. It also supports topic 
list for source by separating topic by comma like <code>'topic-1, 
topic-2'</code>.</td>

Review comment:
       ```suggestion
         <td>Topic name(s) to read data from when the table is used as source. 
It also supports topic list for source by separating topic by comma like 
<code>'topic-1, topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. When the table is used as sink, the topic name is 
the topic to write data to. Note topic list is not supported for sinks.</td>
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -177,6 +191,14 @@ Connector Options
 
 Features
 ----------------
+### Topic and Partition Discovery
+
+The config option `topic` and `topic-pattern` specifies the topics or topic 
pattern to consume for source. The config option `topic` can accept topic list 
by inputting value like 'topic-1, topic-2'. 

Review comment:
       ```suggestion
   The config option `topic` and `topic-pattern` specifies the topics or topic 
pattern to consume for source. The config option `topic` can accept topic list 
using comma separator like 'topic-1, topic-2'. 
   ```

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -72,26 +82,41 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
                FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
 
                ReadableConfig tableOptions = helper.getOptions();
-
-               String topic = tableOptions.get(TOPIC);
                DecodingFormat<DeserializationSchema<RowData>> decodingFormat = 
helper.discoverDecodingFormat(
                                DeserializationFormatFactory.class,
                                FactoryUtil.FORMAT);
+               Optional<String> topic = tableOptions.getOptional(TOPIC);
+               Optional<String> pattern = 
tableOptions.getOptional(TOPIC_PATTERN);
                // Validate the option data type.
                helper.validateExcept(PROPERTIES_PREFIX);
                // Validate the option values.
                validateTableOptions(tableOptions);
 
                DataType producedDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
-               final StartupOptions startupOptions = 
getStartupOptions(tableOptions, topic);
+
+               final StartupOptions startupOptions = 
getStartupOptions(tableOptions);
+               final Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
+               // add topic-partition discovery
+               
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+                       String.valueOf(tableOptions
+                               .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
+                               .map(val -> val.toMillis())
+                               
.orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
+
                return createKafkaTableSource(
-                               producedDataType,
-                               topic,
-                               
getKafkaProperties(context.getCatalogTable().getOptions()),
-                               decodingFormat,
-                               startupOptions.startupMode,
-                               startupOptions.specificOffsets,
-                               startupOptions.startupTimestampMillis);
+                       producedDataType,
+                       topic.map(value ->
+                               Arrays
+                                       .stream(value.split(","))
+                                       .map(String::trim)
+                                       .collect(Collectors.toList()))
+                               .orElse(null),
+                       pattern.map(value -> 
Pattern.compile(value)).orElse(null),

Review comment:
       I would suggest to add static util methods `List<String> 
getTopics(ReadableConfig)` and `Pattern getTopicPattern(ReadableConfig)` in 
`KafkaOptions`.

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -352,6 +442,49 @@ public void testInvalidSinkSemantic(){
                        new Configuration(),
                        Thread.currentThread().getContextClassLoader());
        }
+
+       @Test
+       public void testSinkWithTopicListOrTopicPattern(){
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "sinkTable");
+
+               Map<String, String> modifiedOptions = getModifiedOptions(
+                       getFullSourceOptions(),
+                       options -> {
+                               options.put("topic", TOPICS);
+                               options.put("scan.startup.mode", 
"earliest-offset");
+                               options.remove("specific-offsets");
+                       });
+               CatalogTable sinkTable = 
createKafkaSinkCatalogTable(modifiedOptions);
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new ValidationException("Flink 
Kafka sink currently doesn't support topic list.")));
+               FactoryUtil.createTableSink(
+                       null,
+                       objectIdentifier,
+                       sinkTable,
+                       new Configuration(),
+                       Thread.currentThread().getContextClassLoader());
+
+               modifiedOptions = getModifiedOptions(
+                       getFullSourceOptions(),
+                       options -> {
+                               options.put("topic-pattern", TOPIC_REGEX);
+                       });
+               sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new ValidationException("Flink 
Kafka sink currently doesn't support 'topic-pattern'.")));

Review comment:
       We shouldn't use `thrown` in one test multiple times, because only the 
first one will be triggered. 

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -152,6 +159,13 @@ Connector Options
       <td>Long</td>
       <td>Start from the specified epoch timestamp (milliseconds) used in case 
of <code>'timestamp'</code> startup mode.</td>
     </tr>
+    <tr>
+      <td><h5>scan.topic-partition-discovery.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(disabled)</td>

Review comment:
       ```suggestion
         <td style="word-wrap: break-word;">(none)</td>
   ```
   
   Currentlly, we don't have `disabled` for `Default` value. We can add more 
explanation for the default value in Description. 

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -152,6 +159,13 @@ Connector Options
       <td>Long</td>
       <td>Start from the specified epoch timestamp (milliseconds) used in case 
of <code>'timestamp'</code> startup mode.</td>
     </tr>
+    <tr>
+      <td><h5>scan.topic-partition-discovery.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(disabled)</td>
+      <td>Duration</td>
+      <td>Optional interval for consumer to discover dynamically created Kafka 
partitions periodically.</td>

Review comment:
       ```suggestion
         <td>Interval for consumer to discover dynamically created Kafka topics 
and partitions periodically.</td>
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead 
if not set)</td>

Review comment:
       We can simplify this `required for sink`.

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -177,6 +191,14 @@ Connector Options
 
 Features
 ----------------
+### Topic and Partition Discovery
+
+The config option `topic` and `topic-pattern` specifies the topics or topic 
pattern to consume for source. The config option `topic` can accept topic list 
by inputting value like 'topic-1, topic-2'. 
+The config option `topic-pattern` will use regex regression to discover the 
matched topic. The config option `scan.topic-partition-discovery.interval` 
enables Kafka connector to discover dynamically created Kafka partitions.
+
+Please refer to [Kafka documentation]({% link dev/connectors/kafka.md 
%}#kafka-consumers-topic-and-partition-discovery) for more caveats about 
delivery guarantees.

Review comment:
       ```suggestion
   Please refer to [Kafka DataStream Connector documentation]({% link 
dev/connectors/kafka.md %}#kafka-consumers-topic-and-partition-discovery) for 
more about topic and partition discovery.
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -177,6 +191,14 @@ Connector Options
 
 Features
 ----------------
+### Topic and Partition Discovery
+
+The config option `topic` and `topic-pattern` specifies the topics or topic 
pattern to consume for source. The config option `topic` can accept topic list 
by inputting value like 'topic-1, topic-2'. 
+The config option `topic-pattern` will use regex regression to discover the 
matched topic. The config option `scan.topic-partition-discovery.interval` 
enables Kafka connector to discover dynamically created Kafka partitions.

Review comment:
       ```suggestion
   The config option `topic-pattern`  will use regular expression to discover 
the matched topic. For example, if the `topic-pattern` is `test-topic-[0-9]`, 
then all topics with names that match the specified regular expression 
(starting with `test-topic-` and ending with a single digit)) will be 
subscribed by the consumer when the job starts running.
   
   To allow the consumer to discover dynamically created topics after the job 
started running, set a non-negative value for 
`scan.topic-partition-discovery.interval`. This allows the consumer to discover 
partitions of new topics with names that also match the specified pattern.
   ```

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -187,6 +197,62 @@ public void testTableSourceCommitOnCheckpointsDisabled() {
                assertFalse(((FlinkKafkaConsumerBase) 
function).getEnableCommitOnCheckpoints());
        }
 
+       @Test
+       public void testTableSourceWithPattern() {
+               // prepare parameters for Kafka table source
+               final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
+
+               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+
+               DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+                       new TestFormatFactory.DecodingFormatMock(",", true);
+
+               // Construct table source using options and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "scanTable");
+
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                       getFullSourceOptions(),
+                       options -> {
+                               options.remove("topic");
+                               options.put("topic-pattern", TOPIC_REGEX);
+                               options.put("scan.startup.mode", 
KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST);
+                               options.remove("scan.startup.specific-offsets");
+                       });
+               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
+
+               final DynamicTableSource actualSource = 
FactoryUtil.createTableSource(null,
+                       objectIdentifier,
+                       catalogTable,
+                       new Configuration(),
+                       Thread.currentThread().getContextClassLoader());
+
+               // Test scan source equals
+               final KafkaDynamicSourceBase expectedKafkaSource = 
getExpectedScanSource(
+                       producedDataType,
+                       null,
+                       Pattern.compile(TOPIC_REGEX),
+                       KAFKA_PROPERTIES,
+                       decodingFormat,
+                       StartupMode.EARLIEST,
+                       specificOffsets,
+                       0);
+               final KafkaDynamicSourceBase actualKafkaSource = 
(KafkaDynamicSourceBase) actualSource;
+               assertEquals(actualKafkaSource, expectedKafkaSource);
+
+               // Test Kafka consumer
+               ScanTableSource.ScanRuntimeProvider provider =
+                       
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+               assertThat(provider, instanceOf(SourceFunctionProvider.class));
+               final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
+               final SourceFunction<RowData> sourceFunction = 
sourceFunctionProvider.createSourceFunction();
+               assertThat(sourceFunction, 
instanceOf(getExpectedConsumerClass()));
+               //  Test commitOnCheckpoints flag should be true when set 
consumer group
+               assertTrue(((FlinkKafkaConsumerBase) 
sourceFunction).getEnableCommitOnCheckpoints());

Review comment:
       This has been verified in the other test. We don't need to test it again 
here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to