davidradl commented on code in PR #2:
URL: 
https://github.com/apache/flink-connector-http/pull/2#discussion_r2445251925


##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java:
##########
@@ -255,4 +298,126 @@ private LookupSchemaEntry<RowData> processRow(RowField 
rowField, int parentIndex
                     name, RowData.createFieldGetter(type1, parentIndex));
         }
     }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+        decodingFormat.listReadableMetadata().forEach((key, value) -> 
metadataMap.put(key, value));
+
+        // according to convention, the order of the final row must be
+        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+        // where the format metadata has highest precedence
+        // add connector metadata
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, 
m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        // separate connector and format metadata
+        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
+        final Map<String, DataType> formatMetadata = 
decodingFormat.listReadableMetadata();
+        // store non connector keys and remove them from the 
connectorMetadataKeys.
+        List<String> formatMetadataKeys = new ArrayList<>();
+        Set<String> metadataKeysSet = 
metadataKeys.stream().collect(Collectors.toSet());
+        for (ReadableMetadata rm : ReadableMetadata.values()) {
+            String metadataKeyToCheck = rm.name();
+            if (!metadataKeysSet.contains(metadataKeyToCheck)) {
+                formatMetadataKeys.add(metadataKeyToCheck);
+                connectorMetadataKeys.remove(metadataKeyToCheck);
+            }
+        }
+        // push down format metadata keys
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    formatMetadataKeys.stream().collect(Collectors.toList());
+            decodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+        this.metadataKeys = connectorMetadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+    enum ReadableMetadata {
+        ERROR_STRING(
+                "error-string",
+                DataTypes.STRING(),
+                new MetadataConverter() {

Review Comment:
   Thanks for the feedback @ferenc-csaky. I was copying the style and naming 
used in the [Kafka 
connector](https://github.com/apache/flink-connector-kafka/blob/cb5c5c07318ba602c6c63cb116774a12c52fc478/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L608).
   
   I think your suggestion makes it nicer - so will follow that.
   
   
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to