ferenc-csaky commented on code in PR #2:
URL:
https://github.com/apache/flink-connector-http/pull/2#discussion_r2435947048
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java:
##########
@@ -255,4 +298,126 @@ private LookupSchemaEntry<RowData> processRow(RowField
rowField, int parentIndex
name, RowData.createFieldGetter(type1, parentIndex));
}
}
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ decodingFormat.listReadableMetadata().forEach((key, value) ->
metadataMap.put(key, value));
+
+ // according to convention, the order of the final row must be
+ // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+ // where the format metadata has highest precedence
+ // add connector metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key,
m.dataType));
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ // separate connector and format metadata
+ final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
+ final Map<String, DataType> formatMetadata =
decodingFormat.listReadableMetadata();
+ // store non connector keys and remove them from the
connectorMetadataKeys.
+ List<String> formatMetadataKeys = new ArrayList<>();
+ Set<String> metadataKeysSet =
metadataKeys.stream().collect(Collectors.toSet());
+ for (ReadableMetadata rm : ReadableMetadata.values()) {
+ String metadataKeyToCheck = rm.name();
+ if (!metadataKeysSet.contains(metadataKeyToCheck)) {
+ formatMetadataKeys.add(metadataKeyToCheck);
+ connectorMetadataKeys.remove(metadataKeyToCheck);
+ }
+ }
+ // push down format metadata keys
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ formatMetadataKeys.stream().collect(Collectors.toList());
+ decodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+ this.metadataKeys = connectorMetadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+ enum ReadableMetadata {
+ ERROR_STRING(
+ "error-string",
+ DataTypes.STRING(),
+ new MetadataConverter() {
Review Comment:
I think it would worth to move all these anonymous classes from here to
`MetadataConverter` as static inner classes. That interface is package-private
anyways, classes could have a proper name and this enum would become more
readable.
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java:
##########
@@ -125,11 +149,30 @@ public LookupRuntimeProvider
getLookupRuntimeProvider(LookupContext lookupContex
protected LookupRuntimeProvider getLookupRuntimeProvider(
LookupRow lookupRow,
DeserializationSchema<RowData> responseSchemaDecoder,
- PollingClientFactory<RowData> pollingClientFactory) {
-
+ PollingClientFactory pollingClientFactory) {
+ MetadataConverter[] metadataConverters = {};
+ if (this.metadataKeys != null) {
+ metadataConverters =
+ this.metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(
+
HttpLookupTableSource.ReadableMetadata
+ .values())
+ .filter(rm ->
rm.key.equals(k))
+ .findFirst()
+
.orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+ }
Review Comment:
nit: i'd separate this block to a `private`, or even 2 for the first `.map`
to escape this crazy indentation and making it more readable :)
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java:
##########
@@ -199,10 +248,43 @@ private Collection<RowData> processHttpResponse(
response.statusCode(),
responseBody);
- if (StringUtils.isNullOrWhitespaceOnly(responseBody) ||
ignoreResponse(response)) {
- return Collections.emptyList();
+ if (!isError
+ && (StringUtils.isNullOrWhitespaceOnly(responseBody) ||
ignoreResponse(response))) {
+ return HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .httpCompletionState(HttpCompletionState.SUCCESS)
+ .build();
+ } else {
Review Comment:
nit: `else` branch can be removed, so we can eliminate 1 extra indentation
level making it more readable
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java:
##########
@@ -199,10 +248,43 @@ private Collection<RowData> processHttpResponse(
response.statusCode(),
responseBody);
- if (StringUtils.isNullOrWhitespaceOnly(responseBody) ||
ignoreResponse(response)) {
- return Collections.emptyList();
+ if (!isError
+ && (StringUtils.isNullOrWhitespaceOnly(responseBody) ||
ignoreResponse(response))) {
Review Comment:
This is more like an opinion, but personally i'd consider this condition
complex enough to extract it to a method.
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java:
##########
@@ -83,6 +95,55 @@ public void open(FunctionContext context) throws Exception {
@Override
public Collection<RowData> lookup(RowData keyRow) {
localHttpCallCounter.incrementAndGet();
- return client.pull(keyRow);
+ List<RowData> outputList = new ArrayList<>();
+ final int metadataArity = metadataConverters.length;
+
+ HttpRowDataWrapper httpRowDataWrapper = client.pull(keyRow);
+ Collection<RowData> httpCollector = httpRowDataWrapper.getData();
+
+ int physicalArity = -1;
+
+ GenericRowData producedRow = null;
+ if (httpRowDataWrapper.shouldIgnore()) {
+ return Collections.emptyList();
+ }
+ // grab the actual data if there is any from the response and populate
the producedRow with
+ // it
+ if (!httpCollector.isEmpty()) {
+ // TODO original code increments again if empty - removing
+ // if (httpCollector.isEmpty()) {
+ // localHttpCallCounter.incrementAndGet();
+ // } else {
Review Comment:
Anything to do about this?
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java:
##########
@@ -115,6 +116,12 @@ public class HttpLookupConnectorOptions {
.noDefaultValue()
.withDescription("Http client connection timeout.");
+ public static final ConfigOption<Boolean> SOURCE_LOOKUP_CONTINUE_ON_ERROR =
+ ConfigOptions.key(CONTINUE_ON_ERROR)
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Continue job on error.");
Review Comment:
I think we should be a bit more descriptive here regarding what error
actually means. The underlying logic distincts the non-successful HTTP
responses and logic errors, but this flag covers both.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]