Savonitar commented on code in PR #174: URL: https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2746515276
########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FormatProjectionPushdownLevel.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.annotation.Internal; + +/** + * Projection pushdown mode for {@link + * org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource}. + */ +@Internal +public enum FormatProjectionPushdownLevel { + + /** The format does not support any kind of projection pushdown. */ + NONE, + + /** The format supports projection pushdown for top-level fields only. */ + TOP_LEVEL, + + /** The format supports projection pushdown for top-level and nested fields. */ + ALL Review Comment: Do we validate format compatibility with projection pushdown levels? To prevent unsupported combination. ########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java: ########## @@ -300,38 +337,42 @@ public Map<String, DataType> listReadableMetadata() { @Override public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) { - // separate connector and format metadata - final List<String> formatMetadataKeys = - metadataKeys.stream() - .filter(k -> k.startsWith(VALUE_METADATA_PREFIX)) - .collect(Collectors.toList()); - final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys); - connectorMetadataKeys.removeAll(formatMetadataKeys); - - // push down format metadata - final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata(); - if (formatMetadata.size() > 0) { - final List<String> requestedFormatMetadataKeys = - formatMetadataKeys.stream() - .map(k -> k.substring(VALUE_METADATA_PREFIX.length())) - .collect(Collectors.toList()); - valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys); + this.valueFormatMetadataKeys = new ArrayList<>(); + this.metadataKeys = new ArrayList<>(); + for (final String key : metadataKeys) { + if (key.startsWith(VALUE_METADATA_PREFIX)) { + final String formatMetadataKey = key.substring(VALUE_METADATA_PREFIX.length()); + this.valueFormatMetadataKeys.add(formatMetadataKey); + } else { + this.metadataKeys.add(key); + } } - - this.metadataKeys = connectorMetadataKeys; this.producedDataType = producedDataType; } @Override public boolean supportsMetadataProjection() { - return false; + throw new IllegalStateException( + "This should never be called as KafkaDynamicSource implements the SupportsProjectionPushdown interface."); } @Override public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) { this.watermarkStrategy = watermarkStrategy; } + @Override + public boolean supportsNestedProjection() { + return (keyDecodingFormat == null + || keyFormatProjectionPushdownLevel == FormatProjectionPushdownLevel.ALL) + && valueFormatProjectionPushdownLevel == FormatProjectionPushdownLevel.ALL; + } + + @Override + public void applyProjection(final int[][] projectedFields, final DataType producedDataType) { + this.projectedPhysicalFields = projectedFields; Review Comment: Should we also update `this.producedDataType` here? ########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java: ########## @@ -300,38 +337,42 @@ public Map<String, DataType> listReadableMetadata() { @Override public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) { - // separate connector and format metadata - final List<String> formatMetadataKeys = - metadataKeys.stream() - .filter(k -> k.startsWith(VALUE_METADATA_PREFIX)) - .collect(Collectors.toList()); - final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys); - connectorMetadataKeys.removeAll(formatMetadataKeys); - - // push down format metadata - final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata(); - if (formatMetadata.size() > 0) { - final List<String> requestedFormatMetadataKeys = - formatMetadataKeys.stream() - .map(k -> k.substring(VALUE_METADATA_PREFIX.length())) - .collect(Collectors.toList()); - valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys); + this.valueFormatMetadataKeys = new ArrayList<>(); + this.metadataKeys = new ArrayList<>(); + for (final String key : metadataKeys) { + if (key.startsWith(VALUE_METADATA_PREFIX)) { + final String formatMetadataKey = key.substring(VALUE_METADATA_PREFIX.length()); + this.valueFormatMetadataKeys.add(formatMetadataKey); + } else { + this.metadataKeys.add(key); + } } - - this.metadataKeys = connectorMetadataKeys; this.producedDataType = producedDataType; } @Override public boolean supportsMetadataProjection() { - return false; + throw new IllegalStateException( Review Comment: Should this throw an exception instead of returning a boolean? Could you please elaborate on throwing an exception? E.g. what if this method is called for logging purpose? I've checked other flink-connectors and they either return false or use the default (return true), none throw an exception. ########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/FormatProjectionPushdownLevel.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.annotation.Internal; + +/** + * Projection pushdown mode for {@link + * org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource}. + */ +@Internal +public enum FormatProjectionPushdownLevel { Review Comment: This enum (`NONE, TOP_LEVEL, ALL)` requires users to understand: 1. Whether the format supports projection pushdown 2. Whether the format supports nested projections 3. Known bugs in specific formats (e.g. as was mentioned in the PR description "Avro FLINK-35324") Maybe we can replace per-format level configuration with a single boolean flag: ``` 'projection-pushdown.enabled' = 'true' ``` and internally format will decide which projection it could use. Would this approach address the use cases you had in mind? Or this is added intentionally as a potential workaround for problems with formats? ########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java: ########## Review Comment: Could you please clarify why `DynamicKafkaTableSource` does not implement `SupportsProjectionPushDown`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
