ferenc-csaky commented on code in PR #2:
URL: 
https://github.com/apache/flink-connector-http/pull/2#discussion_r2435947048


##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java:
##########
@@ -255,4 +298,126 @@ private LookupSchemaEntry<RowData> processRow(RowField 
rowField, int parentIndex
                     name, RowData.createFieldGetter(type1, parentIndex));
         }
     }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+        decodingFormat.listReadableMetadata().forEach((key, value) -> 
metadataMap.put(key, value));
+
+        // according to convention, the order of the final row must be
+        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+        // where the format metadata has highest precedence
+        // add connector metadata
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, 
m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        // separate connector and format metadata
+        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
+        final Map<String, DataType> formatMetadata = 
decodingFormat.listReadableMetadata();
+        // store non connector keys and remove them from the 
connectorMetadataKeys.
+        List<String> formatMetadataKeys = new ArrayList<>();
+        Set<String> metadataKeysSet = 
metadataKeys.stream().collect(Collectors.toSet());
+        for (ReadableMetadata rm : ReadableMetadata.values()) {
+            String metadataKeyToCheck = rm.name();
+            if (!metadataKeysSet.contains(metadataKeyToCheck)) {
+                formatMetadataKeys.add(metadataKeyToCheck);
+                connectorMetadataKeys.remove(metadataKeyToCheck);
+            }
+        }
+        // push down format metadata keys
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    formatMetadataKeys.stream().collect(Collectors.toList());
+            decodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+        this.metadataKeys = connectorMetadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+    enum ReadableMetadata {
+        ERROR_STRING(
+                "error-string",
+                DataTypes.STRING(),
+                new MetadataConverter() {

Review Comment:
   I think it would worth to move all these anonymous classes from here to 
`MetadataConverter` as static inner classes. That interface is package-private 
anyways, classes could have a proper name and this enum would become more 
readable.



##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java:
##########
@@ -125,11 +149,30 @@ public LookupRuntimeProvider 
getLookupRuntimeProvider(LookupContext lookupContex
     protected LookupRuntimeProvider getLookupRuntimeProvider(
             LookupRow lookupRow,
             DeserializationSchema<RowData> responseSchemaDecoder,
-            PollingClientFactory<RowData> pollingClientFactory) {
-
+            PollingClientFactory pollingClientFactory) {
+        MetadataConverter[] metadataConverters = {};
+        if (this.metadataKeys != null) {
+            metadataConverters =
+                    this.metadataKeys.stream()
+                            .map(
+                                    k ->
+                                            Stream.of(
+                                                            
HttpLookupTableSource.ReadableMetadata
+                                                                    .values())
+                                                    .filter(rm -> 
rm.key.equals(k))
+                                                    .findFirst()
+                                                    
.orElseThrow(IllegalStateException::new))
+                            .map(m -> m.converter)
+                            .toArray(MetadataConverter[]::new);
+        }

Review Comment:
   nit: i'd separate this block to a `private`, or even 2 for the first `.map` 
to escape this crazy indentation and making it more readable :)



##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java:
##########
@@ -199,10 +248,43 @@ private Collection<RowData> processHttpResponse(
                 response.statusCode(),
                 responseBody);
 
-        if (StringUtils.isNullOrWhitespaceOnly(responseBody) || 
ignoreResponse(response)) {
-            return Collections.emptyList();
+        if (!isError
+                && (StringUtils.isNullOrWhitespaceOnly(responseBody) || 
ignoreResponse(response))) {
+            return HttpRowDataWrapper.builder()
+                    .data(Collections.emptyList())
+                    .httpCompletionState(HttpCompletionState.SUCCESS)
+                    .build();
+        } else {

Review Comment:
   nit: `else` branch can be removed, so we can eliminate 1 extra indentation 
level making it more readable



##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java:
##########
@@ -199,10 +248,43 @@ private Collection<RowData> processHttpResponse(
                 response.statusCode(),
                 responseBody);
 
-        if (StringUtils.isNullOrWhitespaceOnly(responseBody) || 
ignoreResponse(response)) {
-            return Collections.emptyList();
+        if (!isError
+                && (StringUtils.isNullOrWhitespaceOnly(responseBody) || 
ignoreResponse(response))) {

Review Comment:
   This is more like an opinion, but personally i'd consider this condition 
complex enough to extract it to a method.



##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java:
##########
@@ -83,6 +95,55 @@ public void open(FunctionContext context) throws Exception {
     @Override
     public Collection<RowData> lookup(RowData keyRow) {
         localHttpCallCounter.incrementAndGet();
-        return client.pull(keyRow);
+        List<RowData> outputList = new ArrayList<>();
+        final int metadataArity = metadataConverters.length;
+
+        HttpRowDataWrapper httpRowDataWrapper = client.pull(keyRow);
+        Collection<RowData> httpCollector = httpRowDataWrapper.getData();
+
+        int physicalArity = -1;
+
+        GenericRowData producedRow = null;
+        if (httpRowDataWrapper.shouldIgnore()) {
+            return Collections.emptyList();
+        }
+        // grab the actual data if there is any from the response and populate 
the producedRow with
+        // it
+        if (!httpCollector.isEmpty()) {
+            // TODO original code increments again if empty - removing
+            //  if (httpCollector.isEmpty()) {
+            // localHttpCallCounter.incrementAndGet();
+            // } else {

Review Comment:
   Anything to do about this?



##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java:
##########
@@ -115,6 +116,12 @@ public class HttpLookupConnectorOptions {
                     .noDefaultValue()
                     .withDescription("Http client connection timeout.");
 
+    public static final ConfigOption<Boolean> SOURCE_LOOKUP_CONTINUE_ON_ERROR =
+            ConfigOptions.key(CONTINUE_ON_ERROR)
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Continue job on error.");

Review Comment:
   I think we should be a bit more descriptive here regarding what error 
actually means. The underlying logic distincts the non-successful HTTP 
responses and logic errors, but this flag covers both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to