davidradl commented on code in PR #2:
URL:
https://github.com/apache/flink-connector-http/pull/2#discussion_r2445251925
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java:
##########
@@ -255,4 +298,126 @@ private LookupSchemaEntry<RowData> processRow(RowField
rowField, int parentIndex
name, RowData.createFieldGetter(type1, parentIndex));
}
}
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ decodingFormat.listReadableMetadata().forEach((key, value) ->
metadataMap.put(key, value));
+
+ // according to convention, the order of the final row must be
+ // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+ // where the format metadata has highest precedence
+ // add connector metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key,
m.dataType));
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ // separate connector and format metadata
+ final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
+ final Map<String, DataType> formatMetadata =
decodingFormat.listReadableMetadata();
+ // store non connector keys and remove them from the
connectorMetadataKeys.
+ List<String> formatMetadataKeys = new ArrayList<>();
+ Set<String> metadataKeysSet =
metadataKeys.stream().collect(Collectors.toSet());
+ for (ReadableMetadata rm : ReadableMetadata.values()) {
+ String metadataKeyToCheck = rm.name();
+ if (!metadataKeysSet.contains(metadataKeyToCheck)) {
+ formatMetadataKeys.add(metadataKeyToCheck);
+ connectorMetadataKeys.remove(metadataKeyToCheck);
+ }
+ }
+ // push down format metadata keys
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ formatMetadataKeys.stream().collect(Collectors.toList());
+ decodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+ this.metadataKeys = connectorMetadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+ enum ReadableMetadata {
+ ERROR_STRING(
+ "error-string",
+ DataTypes.STRING(),
+ new MetadataConverter() {
Review Comment:
Thanks for the feedback @ferenc-csaky. I was copying the style and naming
used in the [Kafka
connector](https://github.com/apache/flink-connector-kafka/blob/cb5c5c07318ba602c6c63cb116774a12c52fc478/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L608).
I think your suggestion makes it nicer - so will follow that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]