lirui-apache commented on a change in pull request #16809:
URL: https://github.com/apache/flink/pull/16809#discussion_r689360512
##########
File path: docs/content/docs/connectors/table/kafka.md
##########
@@ -217,7 +217,7 @@ Connector Options
<td>required by source</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>The id of the consumer group for Kafka source, optional for Kafka
sink.</td>
+ <td>The id of the consumer group for Kafka source, optional for Kafka
sink. If group ID is not specified, an automatically generated id
"KafkaSource-{tableIdentifier}" will be used.</td>
Review comment:
There's seems to be some inconsistency in the doc. Let's be clear about
whether it's required by source.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -128,7 +132,7 @@
static {
KAFKA_SOURCE_PROPERTIES.setProperty("group.id", "dummy");
KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", "dummy");
-
KAFKA_SOURCE_PROPERTIES.setProperty("flink.partition-discovery.interval-millis",
"1000");
+ KAFKA_SOURCE_PROPERTIES.setProperty("partition.discovery.interval.ms",
"1000");
Review comment:
Can we use `KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key()`
instead of a hard coded string?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -882,7 +864,7 @@ public void testPrimaryKeyValidation() {
// Utilities
//
--------------------------------------------------------------------------------------------
- private static KafkaDynamicSource createExpectedScanSource(
+ private KafkaDynamicSource createExpectedScanSource(
Review comment:
Why not static anymore?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -212,17 +211,9 @@ public void testTableSource() {
0);
assertEquals(actualKafkaSource, expectedKafkaSource);
- // Test Kafka consumer
ScanTableSource.ScanRuntimeProvider provider =
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
- assertThat(provider, instanceOf(SourceFunctionProvider.class));
- final SourceFunctionProvider sourceFunctionProvider =
(SourceFunctionProvider) provider;
- final SourceFunction<RowData> sourceFunction =
- sourceFunctionProvider.createSourceFunction();
- assertThat(sourceFunction, instanceOf(FlinkKafkaConsumer.class));
-
- // Test commitOnCheckpoints flag should be true when set consumer group
- assertTrue(((FlinkKafkaConsumer<?>)
sourceFunction).getEnableCommitOnCheckpoints());
Review comment:
Why do we no longer have to assert this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]