fqshopify commented on code in PR #174:
URL:
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2756058901
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,38 +337,42 @@ public Map<String, DataType> listReadableMetadata() {
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
- // separate connector and format metadata
- final List<String> formatMetadataKeys =
- metadataKeys.stream()
- .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
- .collect(Collectors.toList());
- final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
- connectorMetadataKeys.removeAll(formatMetadataKeys);
-
- // push down format metadata
- final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
- if (formatMetadata.size() > 0) {
- final List<String> requestedFormatMetadataKeys =
- formatMetadataKeys.stream()
- .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
- .collect(Collectors.toList());
-
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ this.valueFormatMetadataKeys = new ArrayList<>();
+ this.metadataKeys = new ArrayList<>();
+ for (final String key : metadataKeys) {
+ if (key.startsWith(VALUE_METADATA_PREFIX)) {
+ final String formatMetadataKey =
key.substring(VALUE_METADATA_PREFIX.length());
+ this.valueFormatMetadataKeys.add(formatMetadataKey);
+ } else {
+ this.metadataKeys.add(key);
+ }
}
-
- this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}
@Override
public boolean supportsMetadataProjection() {
- return false;
+ throw new IllegalStateException(
Review Comment:
From the javadoc for `supportsMetadataProjection`:
```java
/**
* ...
*
* <p>This method is only called if the source does <em>not</em>
implement {@link
* SupportsProjectionPushDown}.
*
* ...
*/
default boolean supportsMetadataProjection() {
```
Since `KafkaDynamicSource` now implements `SupportsProjectionPushDown`, this
method should never be called by the planner. Originally, my reasoning for
throwing an exception here was to catch any unexpected invocations that would
indicate a bug in the planner.
That said, I take your point about consistency with other connectors and
potential issues if it's called for logging/debugging purposes. I've now
removed the method entirely (so it will by default return `true`) 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]