twalthr commented on a change in pull request #16809:
URL: https://github.com/apache/flink/pull/16809#discussion_r688431707
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -137,6 +148,11 @@
/** Flag to determine source mode. In upsert mode, it will keep the
tombstone message. * */
protected final boolean upsertMode;
+ /** Whether to use legacy {@link FlinkKafkaConsumer} for runtime
execution. */
+ protected final boolean fallbackToFlinkKafkaConsumer;
Review comment:
for the sink we decided against a legacy switch. but maybe it makes
sense here if the code duplication and differences to the legacy one are low
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -201,10 +221,32 @@ public ScanRuntimeProvider
getScanRuntimeProvider(ScanContext context) {
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
- final FlinkKafkaConsumer<RowData> kafkaConsumer =
- createKafkaConsumer(keyDeserialization, valueDeserialization,
producedTypeInfo);
+ if (fallbackToFlinkKafkaConsumer) {
+ final FlinkKafkaConsumer<RowData> kafkaConsumer =
+ createKafkaConsumer(keyDeserialization,
valueDeserialization, producedTypeInfo);
+ return SourceFunctionProvider.of(kafkaConsumer, false);
+ }
- return SourceFunctionProvider.of(kafkaConsumer, false);
+ final KafkaSource<RowData> kafkaSource =
+ createKafkaSource(keyDeserialization, valueDeserialization,
producedTypeInfo);
+
+ return new DataStreamScanProvider() {
Review comment:
why can't we use `SourceProvider`?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -329,11 +375,101 @@ public int hashCode() {
specificStartupOffsets,
startupTimestampMillis,
upsertMode,
+ tableIdentifier,
+ fallbackToFlinkKafkaConsumer,
watermarkStrategy);
}
//
--------------------------------------------------------------------------------------------
+ protected KafkaSource<RowData> createKafkaSource(
+ DeserializationSchema<RowData> keyDeserialization,
+ DeserializationSchema<RowData> valueDeserialization,
+ TypeInformation<RowData> producedTypeInfo) {
+ final MetadataConverter[] metadataConverters =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+
.orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+
+ // check if connector metadata is used at all
+ final boolean hasMetadata = metadataKeys.size() > 0;
+
+ // adjust physical arity with value format's metadata
+ final int adjustedPhysicalArity =
+ producedDataType.getChildren().size() - metadataKeys.size();
+
+ // adjust value format projection to include value format's metadata
columns at the end
+ final int[] adjustedValueProjection =
+ IntStream.concat(
+ IntStream.of(valueProjection),
+ IntStream.range(
+ keyProjection.length +
valueProjection.length,
+ adjustedPhysicalArity))
+ .toArray();
+
+ final KafkaDeserializationSchema<RowData> kafkaDeserializer =
+ new DynamicKafkaDeserializationSchema(
+ adjustedPhysicalArity,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ adjustedValueProjection,
+ hasMetadata,
+ metadataConverters,
+ producedTypeInfo,
+ upsertMode);
+
+ final KafkaSourceBuilder<RowData> kafkaSourceBuilder =
KafkaSource.builder();
Review comment:
all code until here could be shared with the legacy implementation to
avoid code duplication
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -150,7 +166,9 @@ public KafkaDynamicSource(
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
- boolean upsertMode) {
+ boolean upsertMode,
+ ObjectIdentifier objectIdentifier,
Review comment:
nit: instead of passing catalog objects into the source, how about you
already create an `String sourceName` in the factory, this avoid calling
`asSummaryString` at multiple locations
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -329,11 +375,101 @@ public int hashCode() {
specificStartupOffsets,
startupTimestampMillis,
upsertMode,
+ tableIdentifier,
+ fallbackToFlinkKafkaConsumer,
watermarkStrategy);
}
//
--------------------------------------------------------------------------------------------
+ protected KafkaSource<RowData> createKafkaSource(
+ DeserializationSchema<RowData> keyDeserialization,
+ DeserializationSchema<RowData> valueDeserialization,
+ TypeInformation<RowData> producedTypeInfo) {
+ final MetadataConverter[] metadataConverters =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+
.orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+
+ // check if connector metadata is used at all
+ final boolean hasMetadata = metadataKeys.size() > 0;
+
+ // adjust physical arity with value format's metadata
+ final int adjustedPhysicalArity =
+ producedDataType.getChildren().size() - metadataKeys.size();
+
+ // adjust value format projection to include value format's metadata
columns at the end
+ final int[] adjustedValueProjection =
+ IntStream.concat(
+ IntStream.of(valueProjection),
+ IntStream.range(
+ keyProjection.length +
valueProjection.length,
+ adjustedPhysicalArity))
+ .toArray();
+
+ final KafkaDeserializationSchema<RowData> kafkaDeserializer =
+ new DynamicKafkaDeserializationSchema(
+ adjustedPhysicalArity,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ adjustedValueProjection,
+ hasMetadata,
+ metadataConverters,
+ producedTypeInfo,
+ upsertMode);
+
+ final KafkaSourceBuilder<RowData> kafkaSourceBuilder =
KafkaSource.builder();
+
+ if (topics != null) {
+ kafkaSourceBuilder.setTopics(topics);
+ } else {
+ kafkaSourceBuilder.setTopicPattern(topicPattern);
+ }
+
+ // For compatibility with legacy source that is not validating group id
+ if (!properties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ kafkaSourceBuilder.setGroupId("KafkaSource-" +
tableIdentifier.asSummaryString());
Review comment:
shall we add a warning to the log to inform the user that the DDL needs
to be updated?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -213,14 +226,18 @@ public void testTableSource() {
// Test Kafka consumer
ScanTableSource.ScanRuntimeProvider provider =
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
- assertThat(provider, instanceOf(SourceFunctionProvider.class));
- final SourceFunctionProvider sourceFunctionProvider =
(SourceFunctionProvider) provider;
- final SourceFunction<RowData> sourceFunction =
- sourceFunctionProvider.createSourceFunction();
- assertThat(sourceFunction, instanceOf(FlinkKafkaConsumer.class));
-
- // Test commitOnCheckpoints flag should be true when set consumer group
- assertTrue(((FlinkKafkaConsumer<?>)
sourceFunction).getEnableCommitOnCheckpoints());
+ if (fallbackToFlinkKafkaConsumer()) {
+ assertThat(provider, instanceOf(SourceFunctionProvider.class));
+ final SourceFunctionProvider sourceFunctionProvider =
(SourceFunctionProvider) provider;
+ final SourceFunction<RowData> sourceFunction =
+ sourceFunctionProvider.createSourceFunction();
+ assertThat(sourceFunction, instanceOf(FlinkKafkaConsumer.class));
+
+ // Test commitOnCheckpoints flag should be true when set consumer
group
+ assertTrue(((FlinkKafkaConsumer<?>)
sourceFunction).getEnableCommitOnCheckpoints());
+ } else {
+ assertThat(provider, instanceOf(DataStreamScanProvider.class));
Review comment:
can we test more if we use `SourceProvider`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]