Savonitar commented on code in PR #174:
URL:
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2746696344
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,38 +337,42 @@ public Map<String, DataType> listReadableMetadata() {
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
- // separate connector and format metadata
- final List<String> formatMetadataKeys =
- metadataKeys.stream()
- .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
- .collect(Collectors.toList());
- final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
- connectorMetadataKeys.removeAll(formatMetadataKeys);
-
- // push down format metadata
- final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
- if (formatMetadata.size() > 0) {
- final List<String> requestedFormatMetadataKeys =
- formatMetadataKeys.stream()
- .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
- .collect(Collectors.toList());
-
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ this.valueFormatMetadataKeys = new ArrayList<>();
+ this.metadataKeys = new ArrayList<>();
+ for (final String key : metadataKeys) {
+ if (key.startsWith(VALUE_METADATA_PREFIX)) {
+ final String formatMetadataKey =
key.substring(VALUE_METADATA_PREFIX.length());
+ this.valueFormatMetadataKeys.add(formatMetadataKey);
+ } else {
+ this.metadataKeys.add(key);
+ }
}
-
- this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}
@Override
public boolean supportsMetadataProjection() {
- return false;
+ throw new IllegalStateException(
Review Comment:
Should this throw an exception instead of returning a boolean? Could you
please elaborate on throwing an exception?
E.g. what if this method is called for logging purpose?
I've checked other `flink-connectors` and they either return false or use
the default (return true), none throw an exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]