Savonitar commented on code in PR #174:
URL: 
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2746515276


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FormatProjectionPushdownLevel.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Projection pushdown mode for {@link
+ * org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource}.
+ */
+@Internal
+public enum FormatProjectionPushdownLevel {
+
+    /** The format does not support any kind of projection pushdown. */
+    NONE,
+
+    /** The format supports projection pushdown for top-level fields only. */
+    TOP_LEVEL,
+
+    /** The format supports projection pushdown for top-level and nested 
fields. */
+    ALL

Review Comment:
   Do we validate format compatibility with projection pushdown levels? To 
prevent unsupported combination.



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,38 +337,42 @@ public Map<String, DataType> listReadableMetadata() {
 
     @Override
     public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
-        // separate connector and format metadata
-        final List<String> formatMetadataKeys =
-                metadataKeys.stream()
-                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
-                        .collect(Collectors.toList());
-        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
-        connectorMetadataKeys.removeAll(formatMetadataKeys);
-
-        // push down format metadata
-        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
-        if (formatMetadata.size() > 0) {
-            final List<String> requestedFormatMetadataKeys =
-                    formatMetadataKeys.stream()
-                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
-                            .collect(Collectors.toList());
-            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        this.valueFormatMetadataKeys = new ArrayList<>();
+        this.metadataKeys = new ArrayList<>();
+        for (final String key : metadataKeys) {
+            if (key.startsWith(VALUE_METADATA_PREFIX)) {
+                final String formatMetadataKey = 
key.substring(VALUE_METADATA_PREFIX.length());
+                this.valueFormatMetadataKeys.add(formatMetadataKey);
+            } else {
+                this.metadataKeys.add(key);
+            }
         }
-
-        this.metadataKeys = connectorMetadataKeys;
         this.producedDataType = producedDataType;
     }
 
     @Override
     public boolean supportsMetadataProjection() {
-        return false;
+        throw new IllegalStateException(
+                "This should never be called as KafkaDynamicSource implements 
the SupportsProjectionPushdown interface.");
     }
 
     @Override
     public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
         this.watermarkStrategy = watermarkStrategy;
     }
 
+    @Override
+    public boolean supportsNestedProjection() {
+        return (keyDecodingFormat == null
+                        || keyFormatProjectionPushdownLevel == 
FormatProjectionPushdownLevel.ALL)
+                && valueFormatProjectionPushdownLevel == 
FormatProjectionPushdownLevel.ALL;
+    }
+
+    @Override
+    public void applyProjection(final int[][] projectedFields, final DataType 
producedDataType) {
+        this.projectedPhysicalFields = projectedFields;

Review Comment:
   Should we also update `this.producedDataType` here?



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,38 +337,42 @@ public Map<String, DataType> listReadableMetadata() {
 
     @Override
     public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
-        // separate connector and format metadata
-        final List<String> formatMetadataKeys =
-                metadataKeys.stream()
-                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
-                        .collect(Collectors.toList());
-        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
-        connectorMetadataKeys.removeAll(formatMetadataKeys);
-
-        // push down format metadata
-        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
-        if (formatMetadata.size() > 0) {
-            final List<String> requestedFormatMetadataKeys =
-                    formatMetadataKeys.stream()
-                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
-                            .collect(Collectors.toList());
-            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        this.valueFormatMetadataKeys = new ArrayList<>();
+        this.metadataKeys = new ArrayList<>();
+        for (final String key : metadataKeys) {
+            if (key.startsWith(VALUE_METADATA_PREFIX)) {
+                final String formatMetadataKey = 
key.substring(VALUE_METADATA_PREFIX.length());
+                this.valueFormatMetadataKeys.add(formatMetadataKey);
+            } else {
+                this.metadataKeys.add(key);
+            }
         }
-
-        this.metadataKeys = connectorMetadataKeys;
         this.producedDataType = producedDataType;
     }
 
     @Override
     public boolean supportsMetadataProjection() {
-        return false;
+        throw new IllegalStateException(

Review Comment:
   Should this throw an exception instead of returning a boolean? Could you 
please elaborate on throwing an exception?
   
   E.g. what if this method is called for logging purpose?
   
   I've checked other flink-connectors and they either return false or use the 
default (return true), none throw an exception.



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FormatProjectionPushdownLevel.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Projection pushdown mode for {@link
+ * org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource}.
+ */
+@Internal
+public enum FormatProjectionPushdownLevel {

Review Comment:
   This enum (`NONE, TOP_LEVEL, ALL)` requires users to understand:
   1. Whether the format supports projection pushdown
   2. Whether the format supports nested projections
   3. Known bugs in specific formats (e.g. as was mentioned in the PR 
description "Avro FLINK-35324")
   
   Maybe we can replace per-format level configuration with a single boolean 
flag:
   ```
   'projection-pushdown.enabled' = 'true'
   ```
   and internally format will decide which projection it could use. 
   
   Would this approach address the use cases you had in mind? Or this is added 
intentionally as a potential workaround for problems with formats?
   



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java:
##########


Review Comment:
   Could you please clarify why `DynamicKafkaTableSource` does not implement 
`SupportsProjectionPushDown`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to