twalthr commented on a change in pull request #16809:
URL: https://github.com/apache/flink/pull/16809#discussion_r688431707



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -137,6 +148,11 @@
     /** Flag to determine source mode. In upsert mode, it will keep the 
tombstone message. * */
     protected final boolean upsertMode;
 
+    /** Whether to use legacy {@link FlinkKafkaConsumer} for runtime 
execution. */
+    protected final boolean fallbackToFlinkKafkaConsumer;

Review comment:
       for the sink we decided against a legacy switch. but maybe it makes 
sense here if the code duplication and differences to the legacy one are low

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -201,10 +221,32 @@ public ScanRuntimeProvider 
getScanRuntimeProvider(ScanContext context) {
         final TypeInformation<RowData> producedTypeInfo =
                 context.createTypeInformation(producedDataType);
 
-        final FlinkKafkaConsumer<RowData> kafkaConsumer =
-                createKafkaConsumer(keyDeserialization, valueDeserialization, 
producedTypeInfo);
+        if (fallbackToFlinkKafkaConsumer) {
+            final FlinkKafkaConsumer<RowData> kafkaConsumer =
+                    createKafkaConsumer(keyDeserialization, 
valueDeserialization, producedTypeInfo);
+            return SourceFunctionProvider.of(kafkaConsumer, false);
+        }
 
-        return SourceFunctionProvider.of(kafkaConsumer, false);
+        final KafkaSource<RowData> kafkaSource =
+                createKafkaSource(keyDeserialization, valueDeserialization, 
producedTypeInfo);
+
+        return new DataStreamScanProvider() {

Review comment:
       why can't we use `SourceProvider`?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -329,11 +375,101 @@ public int hashCode() {
                 specificStartupOffsets,
                 startupTimestampMillis,
                 upsertMode,
+                tableIdentifier,
+                fallbackToFlinkKafkaConsumer,
                 watermarkStrategy);
     }
 
     // 
--------------------------------------------------------------------------------------------
 
+    protected KafkaSource<RowData> createKafkaSource(
+            DeserializationSchema<RowData> keyDeserialization,
+            DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo) {
+        final MetadataConverter[] metadataConverters =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))
+                        .map(m -> m.converter)
+                        .toArray(MetadataConverter[]::new);
+
+        // check if connector metadata is used at all
+        final boolean hasMetadata = metadataKeys.size() > 0;
+
+        // adjust physical arity with value format's metadata
+        final int adjustedPhysicalArity =
+                producedDataType.getChildren().size() - metadataKeys.size();
+
+        // adjust value format projection to include value format's metadata 
columns at the end
+        final int[] adjustedValueProjection =
+                IntStream.concat(
+                                IntStream.of(valueProjection),
+                                IntStream.range(
+                                        keyProjection.length + 
valueProjection.length,
+                                        adjustedPhysicalArity))
+                        .toArray();
+
+        final KafkaDeserializationSchema<RowData> kafkaDeserializer =
+                new DynamicKafkaDeserializationSchema(
+                        adjustedPhysicalArity,
+                        keyDeserialization,
+                        keyProjection,
+                        valueDeserialization,
+                        adjustedValueProjection,
+                        hasMetadata,
+                        metadataConverters,
+                        producedTypeInfo,
+                        upsertMode);
+
+        final KafkaSourceBuilder<RowData> kafkaSourceBuilder = 
KafkaSource.builder();

Review comment:
       all code until here could be shared with the legacy implementation to 
avoid code duplication

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -150,7 +166,9 @@ public KafkaDynamicSource(
             StartupMode startupMode,
             Map<KafkaTopicPartition, Long> specificStartupOffsets,
             long startupTimestampMillis,
-            boolean upsertMode) {
+            boolean upsertMode,
+            ObjectIdentifier objectIdentifier,

Review comment:
       nit: instead of passing catalog objects into the source, how about you 
already create an `String sourceName` in the factory, this avoid calling 
`asSummaryString` at multiple locations

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -329,11 +375,101 @@ public int hashCode() {
                 specificStartupOffsets,
                 startupTimestampMillis,
                 upsertMode,
+                tableIdentifier,
+                fallbackToFlinkKafkaConsumer,
                 watermarkStrategy);
     }
 
     // 
--------------------------------------------------------------------------------------------
 
+    protected KafkaSource<RowData> createKafkaSource(
+            DeserializationSchema<RowData> keyDeserialization,
+            DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo) {
+        final MetadataConverter[] metadataConverters =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))
+                        .map(m -> m.converter)
+                        .toArray(MetadataConverter[]::new);
+
+        // check if connector metadata is used at all
+        final boolean hasMetadata = metadataKeys.size() > 0;
+
+        // adjust physical arity with value format's metadata
+        final int adjustedPhysicalArity =
+                producedDataType.getChildren().size() - metadataKeys.size();
+
+        // adjust value format projection to include value format's metadata 
columns at the end
+        final int[] adjustedValueProjection =
+                IntStream.concat(
+                                IntStream.of(valueProjection),
+                                IntStream.range(
+                                        keyProjection.length + 
valueProjection.length,
+                                        adjustedPhysicalArity))
+                        .toArray();
+
+        final KafkaDeserializationSchema<RowData> kafkaDeserializer =
+                new DynamicKafkaDeserializationSchema(
+                        adjustedPhysicalArity,
+                        keyDeserialization,
+                        keyProjection,
+                        valueDeserialization,
+                        adjustedValueProjection,
+                        hasMetadata,
+                        metadataConverters,
+                        producedTypeInfo,
+                        upsertMode);
+
+        final KafkaSourceBuilder<RowData> kafkaSourceBuilder = 
KafkaSource.builder();
+
+        if (topics != null) {
+            kafkaSourceBuilder.setTopics(topics);
+        } else {
+            kafkaSourceBuilder.setTopicPattern(topicPattern);
+        }
+
+        // For compatibility with legacy source that is not validating group id
+        if (!properties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+            kafkaSourceBuilder.setGroupId("KafkaSource-" + 
tableIdentifier.asSummaryString());

Review comment:
       shall we add a warning to the log to inform the user that the DDL needs 
to be updated?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -213,14 +226,18 @@ public void testTableSource() {
         // Test Kafka consumer
         ScanTableSource.ScanRuntimeProvider provider =
                 
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
-        assertThat(provider, instanceOf(SourceFunctionProvider.class));
-        final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
-        final SourceFunction<RowData> sourceFunction =
-                sourceFunctionProvider.createSourceFunction();
-        assertThat(sourceFunction, instanceOf(FlinkKafkaConsumer.class));
-
-        // Test commitOnCheckpoints flag should be true when set consumer group
-        assertTrue(((FlinkKafkaConsumer<?>) 
sourceFunction).getEnableCommitOnCheckpoints());
+        if (fallbackToFlinkKafkaConsumer()) {
+            assertThat(provider, instanceOf(SourceFunctionProvider.class));
+            final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
+            final SourceFunction<RowData> sourceFunction =
+                    sourceFunctionProvider.createSourceFunction();
+            assertThat(sourceFunction, instanceOf(FlinkKafkaConsumer.class));
+
+            // Test commitOnCheckpoints flag should be true when set consumer 
group
+            assertTrue(((FlinkKafkaConsumer<?>) 
sourceFunction).getEnableCommitOnCheckpoints());
+        } else {
+            assertThat(provider, instanceOf(DataStreamScanProvider.class));

Review comment:
       can we test more if we use `SourceProvider`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to