This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new 8c36d97 [FLINK-38465] Add ability to continue on error, and support
metadata column
8c36d97 is described below
commit 8c36d97c2b0d83de3ebb1c0976a2da3d660f2839
Author: David Radley <[email protected]>
AuthorDate: Wed Oct 22 13:50:01 2025 +0100
[FLINK-38465] Add ability to continue on error, and support metadata column
---
.gitignore | 1 +
docs/content/docs/table/http.md | 45 +-
.../connector/http/clients/PollingClient.java | 9 +-
.../http/clients/PollingClientFactory.java | 13 +-
.../http/config/HttpConnectorConfigConstants.java | 2 +
.../connector/http/retry/RetryStrategyType.java | 3 +-
.../lookup/HttpCompletionState.java} | 23 +-
.../table/lookup/HttpLookupConnectorOptions.java | 9 +
.../http/table/lookup/HttpLookupTableSource.java | 122 +++++-
.../http/table/lookup/HttpRowDataWrapper.java | 51 +++
.../http/table/lookup/HttpTableLookupFunction.java | 68 ++-
.../table/lookup/JavaNetHttpPollingClient.java | 124 +++++-
.../lookup/JavaNetHttpPollingClientFactory.java | 2 +-
.../http/table/lookup/MetadataConverter.java | 99 +++++
.../lookup/HttpLookupTableSourceITCaseTest.java | 481 ++++++++++++++++++++-
.../table/lookup/HttpLookupTableSourceTest.java | 106 ++++-
.../http/table/lookup/HttpRowDataWrapperTest.java | 73 ++++
.../JavaNetHttpPollingClientConnectionTest.java | 14 +-
...avaNetHttpPollingClientHttpsConnectionTest.java | 2 +-
19 files changed, 1148 insertions(+), 99 deletions(-)
diff --git a/.gitignore b/.gitignore
index bbad081..649e1b7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,7 @@
.gitignore.swp
.project
.settings
+.DS_Store
/.java-version
.eslintcache
.cache
diff --git a/docs/content/docs/table/http.md b/docs/content/docs/table/http.md
index 630c0f2..c5f5348 100644
--- a/docs/content/docs/table/http.md
+++ b/docs/content/docs/table/http.md
@@ -177,6 +177,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.security.oidc.token.request |
optional | OIDC `Token Request` body in `application/x-www-form-urlencoded`
encoding
[...]
| http.security.oidc.token.endpoint.url |
optional | OIDC `Token Endpoint` url, to which the token request will be issued
[...]
| http.security.oidc.token.expiry.reduction |
optional | OIDC tokens will be requested if the current time is later than the
cached token expiry time minus this value.
[...]
+| http.source.lookup.continue-on-error |
optional | When true, the flow will continue on errors, returning row content.
When false (the default) the job ends on errors.
[...]
| http.source.lookup.request.timeout |
optional | Sets HTTP request timeout in seconds. If not specified, the default
value of 30 seconds will be used.
[...]
| http.source.lookup.request.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup request processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 8 threads will be
used.
[...]
| http.source.lookup.response.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup response processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 4 threads will be
used.
[...]
@@ -297,17 +298,19 @@ The source table categorizes HTTP responses into three
groups based on status co
- Error codes:
Any response code that is not classified as a retry or success code falls
into this category. Receiving such a response will result in a job failure.
-
-### Retries (Lookup source)
+### Retries and handling errors (Lookup source)
Lookup source handles auto-retries for two scenarios:
1. IOException occurs (e.g. temporary network outage)
2. The response contains a HTTP error code that indicates a retriable error.
These codes are defined in the table configuration (see
`http.source.lookup.retry-codes`).
- Retries are executed silently, without restarting the job. After reaching
max retries attempts (per request) operation will fail and restart job.
+ Retries are executed silently, without restarting the job.
Notice that HTTP codes are categorized into into 3 groups:
- successful responses - response is returned immediately for further
processing
- temporary errors - request will be retried up to the retry limit
-- error responses - unexpected responses are not retried and will fail the
job. Any HTTP error code which is not configured as successful or temporary
error is treated as an unretriable error.
+- error responses - unexpected responses are not retried. Any HTTP error code
which is not configured as successful or temporary error is treated as an
unretriable error.
+
+For temporary errors that have reached max retries attempts (per request) and
error responses, the operation will
+succeed if `gid.connector.http.source.lookup.continue-on-error` is true,
otherwise the job will fail.
##### Retry strategy
User can choose retry strategy type for source table:
@@ -461,7 +464,39 @@ CREATE TABLE http (
## Available Metadata
-The is no available metadata for this connector.
+The metadata column `http-status-code`, if specified in the table definition,
will get the HTTP status code.
+The metadata column `http-headers-map `, if specified in the table definition,
will get a map of the HTTP headers.
+
+HTTP requests can fail either immediately or after temporary error retries.
The usual behaviour after such failures is to end the job. If you would like to
continue
+processing after these failures then specify
`http.source.lookup.continue-on-error` as true. THe lookup join will complete
without content in the expected enrichment columns from the http call,
+this means that these columns will be null for nullable columns and hold a
default value for the type for non-nullable columns.
+
+When using `http.source.lookup.continue-on-error` as true, consider adding
extra metadata columns that will surface information about failures into your
stream.
+
+Metadata columns can be specified and hold http information. They are optional
read-only columns that must be declared VIRTUAL to exclude them during an
INSERT INTO operation.
+
+| Key | Data Type | Description
|
+|-----------------------|----------------------------------|----------------------------------------|
+| error-string | STRING NULL | A message
associated with the error |
+| http-status-code | INT NULL | The HTTP status
code |
+| http-headers-map | MAP <STRING, ARRAY<STRING>> NULL | The headers
returned with the response |
+| http-completion-state | STRING NULL | The completion
state of the http call. |
+
+### http-completion-state possible values
+
+| Value | Description |
+|:------------------|------------------------|
+| SUCCESS | Success |
+| HTTP_ERROR_STATUS | HTTP error status code |
+| EXCEPTION | An Exception occurred |
+
+If the `error-string` metadata column is defined on the table and the call
succeeds then it will have a null value.
+
+When a http lookup call fails and populates the metadata columns with the
error information, the expected enrichment columns from the http call
+are not populated, this means that they will be null for nullable columns and
hold a default value for the type for non-nullable columns.
+
+If you are using the Table API `TableResult` and have an `await` with a
timeout, this Timeout exception will cause the job to terminate,
+even if there are metadata columns defined.
## HTTP status code handler
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClient.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClient.java
index 6aa9703..dbc4583 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClient.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClient.java
@@ -17,21 +17,20 @@
package org.apache.flink.connector.http.clients;
+import org.apache.flink.connector.http.table.lookup.HttpRowDataWrapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
-import java.util.Collection;
-
/** A client that is used to get enrichment data from external component. */
-public interface PollingClient<T> {
+public interface PollingClient {
/**
* Gets enrichment data from external component using provided lookup
arguments.
*
* @param lookupRow A {@link RowData} containing request parameters.
- * @return an optional result of data lookup.
+ * @return an optional result of data lookup with http information.
*/
- Collection<T> pull(RowData lookupRow);
+ HttpRowDataWrapper pull(RowData lookupRow);
/**
* Initialize the client.
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClientFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClientFactory.java
index 4874e60..7df0b14 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClientFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClientFactory.java
@@ -19,18 +19,15 @@ package org.apache.flink.connector.http.clients;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.http.table.lookup.HttpLookupConfig;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.util.ConfigurationException;
import java.io.Serializable;
-/**
- * Polling client factory.
- *
- * @param <OUT> polling client
- */
-public interface PollingClientFactory<OUT> extends Serializable {
+/** Polling client factory. */
+public interface PollingClientFactory extends Serializable {
- PollingClient<OUT> createPollClient(
- HttpLookupConfig options, DeserializationSchema<OUT> schemaDecoder)
+ PollingClient createPollClient(
+ HttpLookupConfig options, DeserializationSchema<RowData>
schemaDecoder)
throws ConfigurationException;
}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
index 9f7721a..2715005 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
@@ -104,6 +104,8 @@ public final class HttpConnectorConfigConstants {
public static final String SOURCE_CONNECTION_TIMEOUT =
SOURCE_LOOKUP_PREFIX + "connection.timeout";
+ public static final String CONTINUE_ON_ERROR = SOURCE_LOOKUP_PREFIX +
"continue-on-error";
+
public static final String SOURCE_PROXY_HOST = SOURCE_LOOKUP_PREFIX +
"proxy.host";
public static final String SOURCE_PROXY_PORT = SOURCE_LOOKUP_PREFIX +
"proxy.port";
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/retry/RetryStrategyType.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/retry/RetryStrategyType.java
index d3d21d9..1046494 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/retry/RetryStrategyType.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/retry/RetryStrategyType.java
@@ -26,8 +26,7 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public enum RetryStrategyType {
FIXED_DELAY("fixed-delay"),
- EXPONENTIAL_DELAY("exponential-delay"),
- ;
+ EXPONENTIAL_DELAY("exponential-delay");
private final String code;
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClientFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
similarity index 58%
copy from
flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClientFactory.java
copy to
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
index 4874e60..91cfff7 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/clients/PollingClientFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
@@ -15,22 +15,11 @@
* limitations under the License.
*/
-package org.apache.flink.connector.http.clients;
+package org.apache.flink.connector.http.table.lookup;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.connector.http.table.lookup.HttpLookupConfig;
-import org.apache.flink.util.ConfigurationException;
-
-import java.io.Serializable;
-
-/**
- * Polling client factory.
- *
- * @param <OUT> polling client
- */
-public interface PollingClientFactory<OUT> extends Serializable {
-
- PollingClient<OUT> createPollClient(
- HttpLookupConfig options, DeserializationSchema<OUT> schemaDecoder)
- throws ConfigurationException;
+/** These are the possible ways that the http coll can complete. */
+public enum HttpCompletionState {
+ HTTP_ERROR_STATUS,
+ EXCEPTION,
+ SUCCESS
}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
index 849e350..305c3cf 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
@@ -23,6 +23,7 @@ import
org.apache.flink.connector.http.retry.RetryStrategyType;
import java.time.Duration;
+import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.CONTINUE_ON_ERROR;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_ENDPOINT_URL;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_EXPIRY_REDUCTION;
@@ -115,6 +116,14 @@ public class HttpLookupConnectorOptions {
.noDefaultValue()
.withDescription("Http client connection timeout.");
+ public static final ConfigOption<Boolean> SOURCE_LOOKUP_CONTINUE_ON_ERROR =
+ ConfigOptions.key(CONTINUE_ON_ERROR)
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Continue job on error. "
+ + "This includes unsuccessful HTTP status
codes and client side Exceptions, such as Connection Refused.");
+
public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_HOST =
ConfigOptions.key(SOURCE_PROXY_HOST)
.stringType()
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java
index 810dd5d..8d9a601 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSource.java
@@ -34,6 +34,7 @@ import
org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import
org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
@@ -54,7 +55,12 @@ import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.LOOKUP_QUERY_CREATOR_IDENTIFIER;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupTableSourceFactory.row;
@@ -62,7 +68,10 @@ import static
org.apache.flink.connector.http.table.lookup.HttpLookupTableSource
/** http lookyp table source. */
@Slf4j
public class HttpLookupTableSource
- implements LookupTableSource, SupportsProjectionPushDown,
SupportsLimitPushDown {
+ implements LookupTableSource,
+ SupportsReadingMetadata,
+ SupportsProjectionPushDown,
+ SupportsLimitPushDown {
private DataType physicalRowDataType;
@@ -73,6 +82,16 @@ public class HttpLookupTableSource
private final DecodingFormat<DeserializationSchema<RowData>>
decodingFormat;
@Nullable private final LookupCache cache;
+ //
--------------------------------------------------------------------------------------------
+ // Mutable attributes
+ //
--------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
public HttpLookupTableSource(
DataType physicalRowDataType,
HttpLookupConfig lookupConfig,
@@ -116,7 +135,7 @@ public class HttpLookupTableSource
lookupQueryCreatorFactory.createLookupQueryCreator(
readableConfig, lookupRow, dynamicTableFactoryContext);
- PollingClientFactory<RowData> pollingClientFactory =
+ PollingClientFactory pollingClientFactory =
createPollingClientFactory(lookupQueryCreator, lookupConfig);
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder,
pollingClientFactory);
@@ -125,11 +144,20 @@ public class HttpLookupTableSource
protected LookupRuntimeProvider getLookupRuntimeProvider(
LookupRow lookupRow,
DeserializationSchema<RowData> responseSchemaDecoder,
- PollingClientFactory<RowData> pollingClientFactory) {
+ PollingClientFactory pollingClientFactory) {
+ MetadataConverter[] metadataConverters = {};
+ if (this.metadataKeys != null) {
+ metadataConverters = createMetadataConverters(this.metadataKeys);
+ }
HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
- pollingClientFactory, responseSchemaDecoder,
lookupRow, lookupConfig);
+ pollingClientFactory,
+ responseSchemaDecoder,
+ lookupRow,
+ lookupConfig,
+ metadataConverters,
+ this.producedDataType);
if (lookupConfig.isUseAsync()) {
AsyncLookupFunction asyncLookupFunction =
new AsyncHttpTableLookupFunction(dataLookupFunction);
@@ -174,7 +202,21 @@ public class HttpLookupTableSource
return true;
}
- private PollingClientFactory<RowData> createPollingClientFactory(
+ private ReadableMetadata findReadableMetadataByKey(String key) {
+ return Stream.of(HttpLookupTableSource.ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new);
+ }
+
+ private MetadataConverter[] createMetadataConverters(List<String>
metadataKeys) {
+ return metadataKeys.stream()
+ .map(this::findReadableMetadataByKey)
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+ }
+
+ private PollingClientFactory createPollingClientFactory(
LookupQueryCreator lookupQueryCreator, HttpLookupConfig
lookupConfig) {
HeaderPreprocessor headerPreprocessor =
@@ -255,4 +297,74 @@ public class HttpLookupTableSource
name, RowData.createFieldGetter(type1, parentIndex));
}
}
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ decodingFormat.listReadableMetadata().forEach((key, value) ->
metadataMap.put(key, value));
+
+ // according to convention, the order of the final row must be
+ // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+ // where the format metadata has highest precedence
+ // add connector metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key,
m.dataType));
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ // separate connector and format metadata
+ final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
+ final Map<String, DataType> formatMetadata =
decodingFormat.listReadableMetadata();
+ // store non connector keys and remove them from the
connectorMetadataKeys.
+ List<String> formatMetadataKeys = new ArrayList<>();
+ Set<String> metadataKeysSet =
metadataKeys.stream().collect(Collectors.toSet());
+ for (ReadableMetadata rm : ReadableMetadata.values()) {
+ String metadataKeyToCheck = rm.name();
+ if (!metadataKeysSet.contains(metadataKeyToCheck)) {
+ formatMetadataKeys.add(metadataKeyToCheck);
+ connectorMetadataKeys.remove(metadataKeyToCheck);
+ }
+ }
+ // push down format metadata keys
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ formatMetadataKeys.stream().collect(Collectors.toList());
+ decodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+ this.metadataKeys = connectorMetadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+ enum ReadableMetadata {
+ ERROR_STRING(
+ "error-string", DataTypes.STRING(), new
MetadataConverter.ErrorStringConverter()),
+ HTTP_STATUS_CODE(
+ "http-status-code",
+ DataTypes.INT(),
+ new MetadataConverter.HttpStatusCodeConverter()),
+ HTTP_HEADERS(
+ "http-headers",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.STRING())),
+ new MetadataConverter.HttpHeadersConverter()),
+ HTTP_COMPLETION_STATE(
+ "http-completion-state",
+ DataTypes.STRING(),
+ new MetadataConverter.HttpCompletionStateConverter());
+
+ final String key;
+ final DataType dataType;
+ final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter
converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpRowDataWrapper.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpRowDataWrapper.java
new file mode 100644
index 0000000..c326743
--- /dev/null
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpRowDataWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.http.table.lookup;
+
+import org.apache.flink.table.data.RowData;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This bean contains the RowData information (the response body as a flink
RowData). It also
+ * contains information from the http response, namely the http headers map
and the http status code
+ * where available. The extra information is for the metadata columns.
+ */
+@Builder
+@Data
+public class HttpRowDataWrapper {
+ private final Collection<RowData> data;
+ private final String errorMessage;
+ private final Map<String, List<String>> httpHeadersMap;
+ private final Integer httpStatusCode;
+ private final HttpCompletionState httpCompletionState;
+
+ public boolean shouldIgnore() {
+ return (this.data != null
+ && this.data.isEmpty()
+ && this.errorMessage == null
+ && this.httpHeadersMap == null
+ && this.httpStatusCode == null
+ && httpCompletionState == HttpCompletionState.SUCCESS);
+ }
+}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
index d88e1a6..750e883 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
@@ -22,22 +22,29 @@ import
org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.http.clients.PollingClient;
import org.apache.flink.connector.http.clients.PollingClientFactory;
import org.apache.flink.connector.http.utils.SerializationSchemaUtils;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** lookup function. */
@Slf4j
public class HttpTableLookupFunction extends LookupFunction {
- private final PollingClientFactory<RowData> pollingClientFactory;
+ private final PollingClientFactory pollingClientFactory;
private final DeserializationSchema<RowData> responseSchemaDecoder;
@@ -50,19 +57,24 @@ public class HttpTableLookupFunction extends LookupFunction
{
private final HttpLookupConfig options;
private transient AtomicInteger localHttpCallCounter;
-
- private transient PollingClient<RowData> client;
+ private final DataType producedDataType;
+ private transient PollingClient client;
+ private final MetadataConverter[] metadataConverters;
public HttpTableLookupFunction(
- PollingClientFactory<RowData> pollingClientFactory,
+ PollingClientFactory pollingClientFactory,
DeserializationSchema<RowData> responseSchemaDecoder,
LookupRow lookupRow,
- HttpLookupConfig options) {
+ HttpLookupConfig options,
+ MetadataConverter[] metadataConverters,
+ DataType producedDataType) {
this.pollingClientFactory = pollingClientFactory;
this.responseSchemaDecoder = responseSchemaDecoder;
this.lookupRow = lookupRow;
this.options = options;
+ this.metadataConverters = metadataConverters;
+ this.producedDataType = producedDataType;
}
@Override
@@ -83,6 +95,50 @@ public class HttpTableLookupFunction extends LookupFunction {
@Override
public Collection<RowData> lookup(RowData keyRow) {
localHttpCallCounter.incrementAndGet();
- return client.pull(keyRow);
+ List<RowData> outputList = new ArrayList<>();
+ final int metadataArity = metadataConverters.length;
+
+ HttpRowDataWrapper httpRowDataWrapper = client.pull(keyRow);
+ Collection<RowData> httpCollector = httpRowDataWrapper.getData();
+
+ int physicalArity = -1;
+
+ GenericRowData producedRow = null;
+ if (httpRowDataWrapper.shouldIgnore()) {
+ return Collections.emptyList();
+ }
+ // grab the actual data if there is any from the response and populate
the producedRow with
+ // it
+ if (!httpCollector.isEmpty()) {
+ GenericRowData physicalRow = (GenericRowData)
httpCollector.iterator().next();
+ physicalArity = physicalRow.getArity();
+ producedRow =
+ new GenericRowData(physicalRow.getRowKind(), physicalArity
+ metadataArity);
+ // We need to copy in the physical row into the producedRow
+ for (int pos = 0; pos < physicalArity; pos++) {
+ producedRow.setField(pos, physicalRow.getField(pos));
+ }
+ }
+ // if we did not get the physical arity from the http response
physical row then get it from
+ // the producedDataType, which is set when we have metadata
+ if (physicalArity == -1 && producedDataType != null) {
+ List<LogicalType> childrenLogicalTypes =
+ producedDataType.getLogicalType().getChildren();
+ physicalArity = childrenLogicalTypes.size() - metadataArity;
+ }
+ // if there was no data, create an empty producedRow
+ if (producedRow == null) {
+ producedRow = new GenericRowData(RowKind.INSERT, physicalArity +
metadataArity);
+ }
+ // add any metadata
+ if (producedDataType != null) {
+ for (int metadataPos = 0; metadataPos < metadataArity;
metadataPos++) {
+ producedRow.setField(
+ physicalArity + metadataPos,
+
metadataConverters[metadataPos].read(httpRowDataWrapper));
+ }
+ }
+ outputList.add(producedRow);
+ return outputList;
}
}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
index 4e904e8..53be575 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.http.HttpPostRequestCallback;
+import org.apache.flink.connector.http.HttpStatusCodeValidationFailedException;
import org.apache.flink.connector.http.clients.PollingClient;
import org.apache.flink.connector.http.preprocessor.HeaderPreprocessor;
import org.apache.flink.connector.http.retry.HttpClientWithRetry;
@@ -56,6 +57,7 @@ import java.util.Optional;
import java.util.Set;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.RESULT_TYPE;
+import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_CONTINUE_ON_ERROR;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_SUCCESS_CODES;
@@ -66,7 +68,7 @@ import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOp
* implementation supports HTTP traffic only.
*/
@Slf4j
-public class JavaNetHttpPollingClient implements PollingClient<RowData> {
+public class JavaNetHttpPollingClient implements PollingClient {
private static final String RESULT_TYPE_SINGLE_VALUE = "single-value";
private static final String RESULT_TYPE_ARRAY = "array";
@@ -78,6 +80,7 @@ public class JavaNetHttpPollingClient implements
PollingClient<RowData> {
private final HttpPostRequestCallback<HttpLookupSourceRequestEntry>
httpPostRequestCallback;
private final HttpLookupConfig options;
private final Set<Integer> ignoredErrorCodes;
+ private final boolean continueOnError;
public JavaNetHttpPollingClient(
HttpClient httpClient,
@@ -100,6 +103,7 @@ public class JavaNetHttpPollingClient implements
PollingClient<RowData> {
var successCodes = new HashSet<Integer>();
successCodes.addAll(HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_SUCCESS_CODES)));
successCodes.addAll(ignoredErrorCodes);
+ this.continueOnError = config.get(SOURCE_LOOKUP_CONTINUE_ON_ERROR);
this.httpClient =
HttpClientWithRetry.builder()
@@ -114,9 +118,12 @@ public class JavaNetHttpPollingClient implements
PollingClient<RowData> {
}
@Override
- public Collection<RowData> pull(RowData lookupRow) {
+ public HttpRowDataWrapper pull(RowData lookupRow) {
if (lookupRow == null) {
- return Collections.emptyList();
+ return HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .httpCompletionState(HttpCompletionState.SUCCESS)
+ .build();
}
try {
log.debug("Collection<RowData> pull with Rowdata={}.", lookupRow);
@@ -126,16 +133,49 @@ public class JavaNetHttpPollingClient implements
PollingClient<RowData> {
}
}
- private Collection<RowData> queryAndProcess(RowData lookupData) throws
Exception {
+ private HttpRowDataWrapper queryAndProcess(RowData lookupData) throws
Exception {
var request = requestFactory.buildLookupRequest(lookupData);
var oidcProcessor =
HttpHeaderUtils.createOIDCHeaderPreprocessor(options.getReadableConfig());
- var response =
- httpClient.send(
- () -> updateHttpRequestIfRequired(request,
oidcProcessor),
- BodyHandlers.ofString());
- return processHttpResponse(response, request);
+ HttpResponse<String> response = null;
+ HttpRowDataWrapper httpRowDataWrapper = null;
+ try {
+ response =
+ httpClient.send(
+ () -> updateHttpRequestIfRequired(request,
oidcProcessor),
+ BodyHandlers.ofString());
+ } catch (HttpStatusCodeValidationFailedException e) {
+ // Case 1 http non successful response
+ if (!this.continueOnError) {
+ throw e;
+ }
+ // use the response in the Exception
+ response = (HttpResponse<String>) e.getResponse();
+ httpRowDataWrapper = processHttpResponse(response, request, true);
+ } catch (Exception e) {
+ // Case 2 Exception occurred
+ if (!this.continueOnError) {
+ throw e;
+ }
+ String errMessage = e.getMessage();
+ // some exceptions do not have messages including the
java.net.ConnectException we can
+ // get here if the connection is bad.
+ if (errMessage == null) {
+ errMessage = e.toString();
+ }
+ return HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .errorMessage(errMessage)
+ .httpCompletionState(HttpCompletionState.EXCEPTION)
+ .build();
+ }
+ if (httpRowDataWrapper == null) {
+ // Case 3 Successful path.
+ httpRowDataWrapper = processHttpResponse(response, request, false);
+ }
+
+ return httpRowDataWrapper;
}
/**
@@ -186,8 +226,17 @@ public class JavaNetHttpPollingClient implements
PollingClient<RowData> {
return httpRequest;
}
- private Collection<RowData> processHttpResponse(
- HttpResponse<String> response, HttpLookupSourceRequestEntry
request)
+ /**
+ * Process the http response.
+ *
+ * @param response http response
+ * @param request http request
+ * @param isError whether the http response is an error (i.e. not
successful after the retry
+ * processing and accounting for the config)
+ * @return HttpRowDataWrapper http row information and http error
information
+ */
+ private HttpRowDataWrapper processHttpResponse(
+ HttpResponse<String> response, HttpLookupSourceRequestEntry
request, boolean isError)
throws IOException {
this.httpPostRequestCallback.call(response, request, "endpoint",
Collections.emptyMap());
@@ -199,10 +248,41 @@ public class JavaNetHttpPollingClient implements
PollingClient<RowData> {
response.statusCode(),
responseBody);
- if (StringUtils.isNullOrWhitespaceOnly(responseBody) ||
ignoreResponse(response)) {
- return Collections.emptyList();
+ if (this.isSuccessWithNoData(isError, responseBody, response)) {
+ return HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .httpCompletionState(HttpCompletionState.SUCCESS)
+ .build();
+ }
+ if (isError) {
+ return HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .errorMessage(responseBody)
+ .httpHeadersMap(response.headers().map())
+ .httpStatusCode(response.statusCode())
+ .httpCompletionState(HttpCompletionState.HTTP_ERROR_STATUS)
+ .build();
+ } else {
+ Collection<RowData> rowData = Collections.emptyList();
+ HttpCompletionState httpCompletionState =
HttpCompletionState.SUCCESS;
+ String errMessage = null;
+ try {
+ rowData = deserialize(responseBody);
+ } catch (IOException e) {
+ if (!this.continueOnError) {
+ throw e;
+ }
+ httpCompletionState = HttpCompletionState.EXCEPTION;
+ errMessage = e.getMessage();
+ }
+ return HttpRowDataWrapper.builder()
+ .data(rowData)
+ .errorMessage(errMessage)
+ .httpHeadersMap(response.headers().map())
+ .httpStatusCode(response.statusCode())
+ .httpCompletionState(httpCompletionState)
+ .build();
}
- return deserialize(responseBody);
}
@VisibleForTesting
@@ -210,6 +290,22 @@ public class JavaNetHttpPollingClient implements
PollingClient<RowData> {
return this.requestFactory;
}
+ /**
+ * There are cases where we need to return a Successful completion without
data, this can occur
+ * when there is no match on the lookup join key, i.e. the HTTP call is
not in error but does
+ * not return with data, or if the status code needs to be ignored.
+ *
+ * @param isError whether there has been an error at the http processing
level
+ * @param responseBody http response body
+ * @param response response
+ * @return whether we should process with successful completion with no
data.
+ */
+ private boolean isSuccessWithNoData(
+ boolean isError, String responseBody, HttpResponse<String>
response) {
+ return !isError
+ && (StringUtils.isNullOrWhitespaceOnly(responseBody) ||
ignoreResponse(response));
+ }
+
private Collection<RowData> deserialize(String responseBody) throws
IOException {
byte[] rawBytes = responseBody.getBytes();
String resultType =
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientFactory.java
index e002532..973910a 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientFactory.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.ConfigurationException;
import java.net.http.HttpClient;
/** JavaNetHttpPollingClientFactory. */
-public class JavaNetHttpPollingClientFactory implements
PollingClientFactory<RowData> {
+public class JavaNetHttpPollingClientFactory implements PollingClientFactory {
private final HttpRequestFactory requestFactory;
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/MetadataConverter.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/MetadataConverter.java
new file mode 100644
index 0000000..7a37883
--- /dev/null
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/MetadataConverter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.http.table.lookup;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.StringData;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The metadata converters have a read method that is passed a
HttpRowDataWrapper. The
+ * implementations pick out the appropriate value of the metadata from this
object.
+ */
+interface MetadataConverter extends Serializable {
+ /**
+ * @param httpRowDataWrapper an object that contains all metadata content
+ * @return the metadata value for this MetadataConverter.
+ */
+ Object read(HttpRowDataWrapper httpRowDataWrapper);
+
+ // Add these static inner classes inside the MetadataConverter interface
+ class ErrorStringConverter implements MetadataConverter {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(HttpRowDataWrapper httpRowDataWrapper) {
+ if (httpRowDataWrapper == null) {
+ return null;
+ }
+ return StringData.fromString(httpRowDataWrapper.getErrorMessage());
+ }
+ }
+
+ class HttpStatusCodeConverter implements MetadataConverter {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(HttpRowDataWrapper httpRowDataWrapper) {
+ return (httpRowDataWrapper != null) ?
httpRowDataWrapper.getHttpStatusCode() : null;
+ }
+ }
+
+ class HttpHeadersConverter implements MetadataConverter {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(HttpRowDataWrapper httpRowDataWrapper) {
+ if (httpRowDataWrapper == null) {
+ return null;
+ }
+ Map<String, List<String>> httpHeadersMap =
httpRowDataWrapper.getHttpHeadersMap();
+ if (httpHeadersMap == null) {
+ return null;
+ }
+ Map<StringData, ArrayData> stringDataMap = new HashMap<>();
+ for (String key : httpHeadersMap.keySet()) {
+ List<StringData> strDataList = new ArrayList<>();
+ httpHeadersMap.get(key).stream()
+ .forEach((c) ->
strDataList.add(StringData.fromString(c)));
+ stringDataMap.put(
+ StringData.fromString(key), new
GenericArrayData(strDataList.toArray()));
+ }
+ return new GenericMapData(stringDataMap);
+ }
+ }
+
+ class HttpCompletionStateConverter implements MetadataConverter {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(HttpRowDataWrapper httpRowDataWrapper) {
+ if (httpRowDataWrapper == null) {
+ return null;
+ }
+ return
StringData.fromString(httpRowDataWrapper.getHttpCompletionState().name());
+ }
+ }
+}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
index a1c4d15..ade60a0 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
@@ -42,6 +42,8 @@ import
com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.matching.StringValuePattern;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
+import lombok.Builder;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -49,10 +51,13 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -714,8 +719,9 @@ class HttpLookupTableSourceITCaseTest {
// For testing the
http.source.lookup.use-raw-authorization-header
// configuration parameter:
expectedAuthHeaderValue != null ? "Authorization" : null,
- expectedAuthHeaderValue // expected value of extra header
- );
+ expectedAuthHeaderValue, // expected value of extra header
+ null,
+ false);
String fields =
" `nestedRow` ROW<"
@@ -979,6 +985,43 @@ class HttpLookupTableSourceITCaseTest {
private SortedSet<Row> testLookupJoin(String lookupTable, int maxRows)
throws Exception {
+ createLookupAndSourceTables(lookupTable, maxRows);
+
+ // WHEN
+ // SQL query that performs JOIN on both tables.
+ String joinQuery =
+ "SELECT o.id, o.id2, c.msg, c.uuid, c.isActive, c.balance FROM
Orders AS o "
+ + "JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS
c "
+ + "ON o.id = c.id "
+ + "AND o.id2 = c.id2";
+
+ TableResult result = tEnv.executeSql(joinQuery);
+ result.await(15, TimeUnit.SECONDS);
+
+ // THEN
+ return getCollectedRows(result);
+ }
+
+ private SortedSet<Row> testLookupJoinWithMetadata(String lookupTable, int
maxRows)
+ throws Exception {
+
+ createLookupAndSourceTables(lookupTable, maxRows);
+
+ // WHEN
+ String joinQuery =
+ "SELECT o.id, o.id2, c.msg, c.uuid, c.isActive, c.balance, "
+ + "c.errStr, c.statusCode, c.headers,
c.completionState FROM Orders AS o "
+ + "JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS
c "
+ + "ON o.id = c.id "
+ + "AND o.id2 = c.id2";
+
+ TableResult result = tEnv.executeSql(joinQuery);
+
+ // THEN
+ return getCollectedRows(result);
+ }
+
+ private void createLookupAndSourceTables(String lookupTable, int maxRows) {
String sourceTable =
"CREATE TABLE Orders ("
+ "id STRING,"
@@ -1001,23 +1044,29 @@ class HttpLookupTableSourceITCaseTest {
tEnv.executeSql(sourceTable);
tEnv.executeSql(lookupTable);
+ }
- // WHEN
- // SQL query that performs JOIN on both tables.
- String joinQuery =
- "SELECT o.id, o.id2, c.msg, c.uuid, c.isActive, c.balance FROM
Orders AS o "
- + "JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS
c "
- + "ON o.id = c.id "
- + "AND o.id2 = c.id2";
-
- TableResult result = tEnv.executeSql(joinQuery);
- result.await(15, TimeUnit.SECONDS);
-
- // THEN
- return getCollectedRows(result);
+ private void assertResultsForSpec(TestSpec spec, Collection<Row> rows) {
+ if (spec.badStatus) {
+ assertEnrichedRowsNoDataBadStatus(rows);
+ } else if (spec.deserError) {
+ assertEnrichedRowsDeserException(rows);
+ } else if (spec.connectionError) {
+ assertEnrichedRowsException(rows);
+ } else if (spec.useMetadata) {
+ assertEnrichedRows(rows, true);
+ } else {
+ assertEnrichedRows(rows);
+ }
}
private void assertEnrichedRows(Collection<Row> collectedRows) {
+ assertEnrichedRows(collectedRows, false);
+ }
+
+ private void assertEnrichedRows(Collection<Row> collectedRows, boolean
withMetadata) {
+
+ final int rowArity = withMetadata ? 10 : 6;
// validate every row and its column.
assertAll(
() -> {
@@ -1025,9 +1074,8 @@ class HttpLookupTableSourceITCaseTest {
int intElement = 0;
for (Row row : collectedRows) {
intElement++;
- assertThat(row.getArity()).isEqualTo(6);
-
- // "id" nad "id2" columns should be different for
every row.
+ assertThat(row.getArity()).isEqualTo(rowArity);
+ // "id" and "id2" columns should be different for
every row.
assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement));
assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1));
@@ -1035,6 +1083,102 @@ class HttpLookupTableSourceITCaseTest {
.isEqualTo("fbb68a46-80a9-46da-9d40-314b5287079c");
assertThat(row.getField("isActive")).isEqualTo(true);
assertThat(row.getField("balance")).isEqualTo("$1,729.34");
+ if (withMetadata) {
+ assertThat(row.getField("errStr")).isNull();
+ assertThat(row.getField("headers")).isNotNull();
+
assertThat(row.getField("statusCode")).isEqualTo(200);
+ assertThat(row.getField("completionState"))
+
.isEqualTo(HttpCompletionState.SUCCESS.name());
+ }
+ }
+ });
+ }
+
+ private void assertEnrichedRowsNoDataBadStatus(Collection<Row>
collectedRows) {
+
+ final int rowArity = 10;
+ // validate every row and its column.
+
+ assertAll(
+ () -> {
+ assertThat(collectedRows.size()).isEqualTo(4);
+ int intElement = 0;
+ for (Row row : collectedRows) {
+ intElement++;
+ assertThat(row.getArity()).isEqualTo(rowArity);
+ // "id" and "id2" columns should be different for
every row.
+
assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement));
+
assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1));
+ assertThat(row.getField("uuid")).isNull();
+ assertThat(row.getField("isActive")).isNull();
+ assertThat(row.getField("balance")).isNull();
+ // metadata
+ assertThat(row.getField("errStr")).isEqualTo("");
+ assertThat(row.getField("headers")).isNotNull();
+ assertThat(row.getField("statusCode")).isEqualTo(500);
+ assertEquals(
+ row.getField("completionState"),
+ HttpCompletionState.HTTP_ERROR_STATUS.name());
+ }
+ });
+ }
+
+ private void assertEnrichedRowsDeserException(Collection<Row>
collectedRows) {
+
+ final int rowArity = 10;
+ // validate every row and its column.
+
+ assertAll(
+ () -> {
+ assertThat(collectedRows.size()).isEqualTo(4);
+ int intElement = 0;
+ for (Row row : collectedRows) {
+ intElement++;
+ assertThat(row.getArity()).isEqualTo(rowArity);
+ // "id" and "id2" columns should be different for
every row.
+
assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement));
+
assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1));
+ assertThat(row.getField("uuid")).isNull();
+ assertThat(row.getField("isActive")).isNull();
+ assertThat(row.getField("balance")).isNull();
+ // metadata
+ assertThat(row.getField("errStr"))
+ .isEqualTo(
+ "Failed to deserialize JSON 'A test
string that is not json'.");
+ assertThat(row.getField("headers")).isNotNull();
+ assertThat(row.getField("statusCode")).isEqualTo(200);
+ assertEquals(
+ row.getField("completionState"),
+ HttpCompletionState.EXCEPTION.name());
+ }
+ });
+ }
+
+ private void assertEnrichedRowsException(Collection<Row> collectedRows) {
+
+ final int rowArity = 10;
+ // validate every row and its column.
+
+ assertAll(
+ () -> {
+ assertThat(collectedRows.size()).isEqualTo(4);
+ int intElement = 0;
+ for (Row row : collectedRows) {
+ intElement++;
+ assertThat(row.getArity()).isEqualTo(rowArity);
+ // "id" and "id2" columns should be different for
every row.
+
assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement));
+
assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1));
+ assertThat(row.getField("uuid")).isNull();
+ assertThat(row.getField("isActive")).isNull();
+ assertThat(row.getField("balance")).isNull();
+ // metadata
+ assertThat(row.getField("errStr")).isNotNull();
+ assertThat(row.getField("headers")).isNull();
+ assertThat(row.getField("statusCode")).isNull();
+ assertEquals(
+ row.getField("completionState"),
+ HttpCompletionState.EXCEPTION.name());
}
});
}
@@ -1081,7 +1225,25 @@ class HttpLookupTableSourceITCaseTest {
String methodName,
WireMockServer wireMockServer,
List<StringValuePattern> matchingJsonPaths) {
- setUpServerBodyStub(methodName, wireMockServer, matchingJsonPaths,
null, null);
+ setUpServerBodyStub(methodName, wireMockServer, matchingJsonPaths,
null, null, null, false);
+ }
+
+ private void setUpServerBodyStub(
+ String methodName,
+ WireMockServer wireMockServer,
+ List<StringValuePattern> matchingJsonPaths,
+ Integer badStatus) {
+ setUpServerBodyStub(
+ methodName, wireMockServer, matchingJsonPaths, null, null,
badStatus, false);
+ }
+
+ private void setUpServerBodyStub(
+ String methodName,
+ WireMockServer wireMockServer,
+ List<StringValuePattern> matchingJsonPaths,
+ boolean isDeserErr) {
+ setUpServerBodyStub(
+ methodName, wireMockServer, matchingJsonPaths, null, null,
null, isDeserErr);
}
private void setUpServerBodyStub(
@@ -1089,7 +1251,9 @@ class HttpLookupTableSourceITCaseTest {
WireMockServer wireMockServer,
List<StringValuePattern> matchingJsonPaths,
String extraHeader,
- String expectedExtraHeaderValue) {
+ String expectedExtraHeaderValue,
+ Integer badStatus,
+ boolean isDeserErr) {
MappingBuilder methodStub =
(methodName.equalsIgnoreCase("PUT")
@@ -1108,11 +1272,284 @@ class HttpLookupTableSourceITCaseTest {
for (StringValuePattern pattern : matchingJsonPaths) {
methodStub.withRequestBody(pattern);
}
-
-
methodStub.willReturn(aResponse().withTransformers(JsonTransform.NAME));
+ if (badStatus == null) {
+ if (isDeserErr) {
+ methodStub.willReturn(
+ aResponse()
+ .withBody("A test string that is not json")
+ .withStatus(200)
+ .withHeader("Content-Type", "text/plain"));
+ } else {
+ methodStub.willReturn(
+ aResponse()
+ .withTransformers(JsonTransform.NAME)
+ .withHeader("Content-Type",
"application/json"));
+ }
+ } else {
+ methodStub.willReturn(
+ aResponse()
+ .withBody(new byte[0])
+ .withStatus(500)
+ .withHeader("Content-Type", "text/plain"));
+ }
StubMapping stubMapping = wireMockServer.stubFor(methodStub);
wireMockServer.addStubMapping(stubMapping);
}
+
+ // Prototype parameterizedTest
+ @ParameterizedTest
+ @MethodSource("testSpecProvider")
+ void testHttpLookupJoinParameterized(TestSpec spec) throws Exception {
+ // GIVEN
+ setupServerStubForSpec(spec);
+
+ // Create lookup table SQL
+ String lookupTable = createLookupTableSql(spec);
+
+ // WHEN
+ SortedSet<Row> rows = null;
+ boolean expectToContinue =
+ spec.continueOnError && (spec.connectionError ||
spec.deserError || spec.badStatus);
+ try {
+ if (spec.useMetadata) {
+ rows = testLookupJoinWithMetadata(lookupTable, spec.maxRows);
+ } else {
+ rows = testLookupJoin(lookupTable, spec.maxRows);
+ }
+ // THEN
+ assertResultsForSpec(spec, rows);
+ } catch (Exception e) {
+ assertThat(expectToContinue).isFalse();
+ }
+ }
+
+ static Collection<TestSpec> testSpecProvider() {
+ List<TestSpec> specs = new ArrayList<>();
+
+ // Basic test cases (testHttpLookupJoin)
+ for (String method : Arrays.asList("GET", "POST", "PUT")) {
+ for (boolean asyncFlag : Arrays.asList(false, true)) {
+ for (boolean continueOnError : Arrays.asList(false, true)) {
+ specs.add(
+ TestSpec.builder()
+ .testName("Basic HTTP Lookup Join")
+ .methodName(method)
+ .maxRows(4)
+ .useAsync(asyncFlag)
+ .continueOnError(continueOnError)
+ .build());
+ }
+ }
+ }
+
+ // Metadata success test cases (testHttpLookupJoinWithMetadataSuccess)
+ for (String method : Arrays.asList("GET", "POST", "PUT")) {
+ for (boolean asyncFlag : Arrays.asList(false, true)) {
+ for (boolean continueOnError : Arrays.asList(false, true)) {
+ final String testName =
+ "HTTP Lookup Join With Metadata Success continue
on error:"
+ + continueOnError
+ + ", asyncFlag:"
+ + asyncFlag;
+ specs.add(
+ TestSpec.builder()
+ .methodName(method)
+ .testName(testName)
+ .useMetadata(true)
+ .maxRows(4)
+ .useAsync(asyncFlag)
+ .continueOnError(continueOnError)
+ .build());
+ }
+ }
+ }
+
+ // Bad status test cases (testHttpLookupJoinWithMetadataBadStatus)
+ for (String method : Arrays.asList("GET", "POST", "PUT")) {
+ for (boolean asyncFlag : Arrays.asList(false, true)) {
+ for (boolean continueOnError : Arrays.asList(false, true)) {
+ final String testName =
+ "HTTP Lookup Join With Metadata Bad Status
continue on error:"
+ + continueOnError
+ + ". asyncFlag:"
+ + asyncFlag;
+ specs.add(
+ TestSpec.builder()
+ .testName(testName)
+ .methodName(method)
+ .useMetadata(true)
+ .maxRows(4)
+ .useAsync(asyncFlag)
+ .badStatus(true)
+ .continueOnError(continueOnError)
+ .build());
+ }
+ }
+ }
+
+ // Deserialization error test cases
(testHttpLookupJoinWithMetadataDeserException)
+ for (String method : Arrays.asList("GET", "POST", "PUT")) {
+ for (boolean asyncFlag : Arrays.asList(false, true)) {
+ for (boolean continueOnError : Arrays.asList(false, true)) {
+ specs.add(
+ TestSpec.builder()
+ .testName(
+ "HTTP Lookup Join With Metadata
Deserialization Error")
+ .methodName(method)
+ .useMetadata(true)
+ .maxRows(4)
+ .useAsync(asyncFlag)
+ .deserError(true)
+ .continueOnError(continueOnError)
+ .build());
+ }
+ }
+ }
+
+ // Connection error test cases
(testHttpLookupJoinWithMetadataException)
+ for (String method : Arrays.asList("GET", "POST", "PUT")) {
+ for (boolean asyncFlag : Arrays.asList(false, true)) {
+ for (boolean continueOnError : Arrays.asList(false, true)) {
+ specs.add(
+ TestSpec.builder()
+ .testName("HTTP Lookup Join With Metadata
Connection Error")
+ .methodName(method)
+ .useMetadata(true)
+ .maxRows(4)
+ .useAsync(asyncFlag)
+ .connectionError(true)
+ .continueOnError(continueOnError)
+ .build());
+ }
+ }
+ }
+
+ return specs;
+ }
+
+ @Builder
+ @Data
+ private static class TestSpec {
+ // Test identification
+ final String testName;
+ final String methodName;
+
+ // Server stub configuration
+ final boolean useMetadata;
+ final boolean badStatus;
+ final boolean deserError;
+ final boolean connectionError;
+
+ // Test execution configuration
+ final int maxRows;
+ final boolean useAsync;
+ final boolean continueOnError;
+
+ @Override
+ public String toString() {
+ return testName + " [" + methodName + "]";
+ }
+ }
+
+ private void setupServerStubForSpec(TestSpec spec) {
+ if (spec.badStatus) {
+ // Setup for bad status test
+ if (StringUtils.isNullOrWhitespaceOnly(spec.methodName)
+ || spec.methodName.equalsIgnoreCase("GET")) {
+ wireMockServer.stubFor(
+ get(urlPathEqualTo(ENDPOINT))
+ .withHeader("Content-Type",
equalTo("application/json"))
+ .willReturn(aResponse().withBody(new
byte[0]).withStatus(500))
+ .withHeader("Content-Type",
equalTo("application/json")));
+ } else {
+ setUpServerBodyStub(
+ spec.methodName,
+ wireMockServer,
+ List.of(matchingJsonPath("$.id"),
matchingJsonPath("$.id2")),
+ Integer.valueOf(500));
+ }
+ } else if (spec.deserError) {
+
+ // Setup for deserialization error test
+ if (StringUtils.isNullOrWhitespaceOnly(spec.methodName)
+ || spec.methodName.equalsIgnoreCase("GET")) {
+ wireMockServer.stubFor(
+ get(urlPathEqualTo(ENDPOINT))
+ .withHeader("Content-Type",
equalTo("application/json"))
+ .willReturn(
+ aResponse()
+ .withBody("A test string that
is not json")
+ .withStatus(200)));
+ } else {
+ setUpServerBodyStub(
+ spec.methodName,
+ wireMockServer,
+ List.of(matchingJsonPath("$.id"),
matchingJsonPath("$.id2")),
+ true);
+ }
+ } else if (spec.connectionError) {
+ // No need to set up server stub for connection error test
+ // The test will use a non-existent port (9999)
+ } else {
+ // Setup for success test
+ if (StringUtils.isNullOrWhitespaceOnly(spec.methodName)
+ || spec.methodName.equalsIgnoreCase("GET")) {
+ setupServerStub(wireMockServer);
+ } else {
+ setUpServerBodyStub(
+ spec.methodName,
+ wireMockServer,
+ List.of(matchingJsonPath("$.id"),
matchingJsonPath("$.id2")));
+ }
+ }
+ }
+
+ private String createLookupTableSql(TestSpec spec) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("CREATE TABLE Customers (")
+ .append("id STRING,")
+ .append("id2 STRING,")
+ .append("msg STRING,")
+ .append("uuid STRING,")
+ .append("details ROW<")
+ .append("isActive BOOLEAN,")
+ .append("nestedDetails ROW<")
+ .append("balance STRING")
+ .append(">")
+ .append(">");
+
+ if (spec.useMetadata) {
+ sql.append(",")
+ .append("errStr STRING METADATA FROM 'error-string',")
+ .append("statusCode INTEGER METADATA FROM
'http-status-code',")
+ .append("headers MAP<STRING, ARRAY<STRING>> METADATA from
'http-headers',")
+ .append("completionState STRING METADATA from
'http-completion-state'");
+ }
+
+ sql.append(") WITH (").append("'format' =
'json',").append("'connector' = 'rest-lookup',");
+
+ if (!StringUtils.isNullOrWhitespaceOnly(spec.methodName)) {
+ sql.append("'lookup-method' =
'").append(spec.methodName).append("',");
+ }
+
+ // URL with correct port for connection error test
+ if (spec.connectionError) {
+ sql.append("'url' = 'http://localhost:9999/client',");
+ } else {
+ sql.append("'url' = 'http://localhost:" + serverPort +
"/client',");
+ }
+ sql.append("'http.source.lookup.header.Content-Type' =
'application/json',")
+ .append("'http.source.lookup.continue-on-error'='")
+ .append(spec.continueOnError)
+ .append("',");
+ sql.append("'asyncPolling' = '")
+ .append(spec.useAsync ? "true" : "false")
+ .append("',")
+ .append("'table.exec.async-lookup.buffer-capacity' = '50',")
+ .append("'table.exec.async-lookup.timeout' = '120s'")
+ .append(")");
+ return sql.toString();
+ }
}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceTest.java
index 8b4451a..7bae0f2 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.connector.http.table.lookup;
import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.connector.http.WireMockServerPortAllocator;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
@@ -31,30 +30,38 @@ import
org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import
org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
import
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import
org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
+import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
-import javax.annotation.Nullable;
-
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.flink.connector.http.table.lookup.HttpLookupTableSource.ReadableMetadata.ERROR_STRING;
+import static
org.apache.flink.connector.http.table.lookup.HttpLookupTableSource.ReadableMetadata.HTTP_COMPLETION_STATE;
+import static
org.apache.flink.connector.http.table.lookup.HttpLookupTableSource.ReadableMetadata.HTTP_HEADERS;
+import static
org.apache.flink.connector.http.table.lookup.HttpLookupTableSource.ReadableMetadata.HTTP_STATUS_CODE;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupTableSourceFactory.row;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
-/** Test for {@link HttpLookupTableSource}. */
class HttpLookupTableSourceTest {
public static final DataType PHYSICAL_ROW_DATA_TYPE =
@@ -96,6 +103,94 @@ class HttpLookupTableSourceTest {
expectedLookupRow.setLookupPhysicalRowDataType(PHYSICAL_ROW_DATA_TYPE);
}
+ @Test
+ void testListReadableMetadata() {
+ HttpLookupTableSource tableSource =
+ (HttpLookupTableSource) createTableSource(SCHEMA,
getOptions());
+ Map<String, DataType> listMetadataMap =
tableSource.listReadableMetadata();
+ Map<String, DataType> expectedMap = new LinkedHashMap<>();
+ expectedMap.put(HTTP_STATUS_CODE.key, new AtomicDataType(new
IntType(true)));
+ expectedMap.put(
+ HTTP_HEADERS.key,
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.STRING())));
+ expectedMap.put(ERROR_STRING.key, DataTypes.STRING());
+ expectedMap.put(HTTP_COMPLETION_STATE.key, DataTypes.STRING());
+
+ assertThat(listMetadataMap).isEqualTo(expectedMap);
+ }
+
+ @Test
+ void testsummaryString() {
+ HttpLookupTableSource tableSource =
+ (HttpLookupTableSource) createTableSource(SCHEMA,
getOptions());
+ assertThat(tableSource.asSummaryString()).isEqualTo("Http Lookup Table
Source");
+ }
+
+ @Test
+ void testreadReadableMetadata() {
+ HttpLookupTableSource tableSource =
+ (HttpLookupTableSource) createTableSource(SCHEMA,
getOptions());
+ final String testErrorString = "ABC";
+ final int testStatusCode = 500;
+ final HttpCompletionState testCompletionState =
HttpCompletionState.HTTP_ERROR_STATUS;
+ Map<String, List<String>> testHeaders = new HashMap<>();
+ testHeaders.put("AAA", List.of("BBB", "CCC"));
+ testHeaders.put("DDD", List.of("EEE"));
+ HttpRowDataWrapper httpRowDataWrapper =
+ HttpRowDataWrapper.builder()
+ .errorMessage(testErrorString)
+ .httpStatusCode(500)
+ .httpHeadersMap(testHeaders)
+ .httpCompletionState(testCompletionState)
+ .build();
+ assertThat(ERROR_STRING.converter.read(httpRowDataWrapper))
+ .isEqualTo(StringData.fromString(testErrorString));
+ assertThat(ERROR_STRING.converter.read(null)).isNull();
+ assertThat(HTTP_STATUS_CODE.converter.read(httpRowDataWrapper))
+ .isEqualTo(Integer.valueOf(testStatusCode));
+ assertThat(HTTP_STATUS_CODE.converter.read(null)).isNull();
+ Object readResultForHeaders =
HTTP_HEADERS.converter.read(httpRowDataWrapper);
+ assertThat(HTTP_HEADERS.converter.read(null)).isNull();
+ assertThat(readResultForHeaders).isInstanceOf(GenericMapData.class);
+ GenericMapData mapData = (GenericMapData) readResultForHeaders;
+
+ // Verify the map has the expected keys
+ ArrayData keys = mapData.keyArray();
+ assertThat(keys.size()).isEqualTo(2);
+
+ // Create a map to store the converted data for comparison
+ Map<String, List<String>> actualMap =
convertGenericMapDataToMap(mapData, keys);
+ // Now compare the extracted map with the expected map
+ assertThat(actualMap).isEqualTo(testHeaders);
+
+ assertThat(HTTP_COMPLETION_STATE.converter.read(null)).isNull();
+
+ assertThat(HTTP_COMPLETION_STATE.converter.read(httpRowDataWrapper))
+ .isEqualTo(StringData.fromString(testCompletionState.name()));
+ }
+
+ private static Map<String, List<String>> convertGenericMapDataToMap(
+ GenericMapData genericMapData, ArrayData keys) {
+ Map<String, List<String>> map = new HashMap<>();
+ ArrayData valueArray = genericMapData.valueArray();
+ // Extract and convert each key-value pair
+ for (int i = 0; i < keys.size(); i++) {
+ ArrayData values = valueArray.getArray(i);
+ StringData key = keys.getString(i);
+ String keyStr = key.toString();
+ List<String> valueList = new ArrayList<>();
+
+ // Extract each string from the array
+ for (int j = 0; j < values.size(); j++) {
+ StringData element = values.getString(j);
+ valueList.add(element.toString());
+ }
+
+ map.put(keyStr, valueList);
+ }
+ return map;
+ }
+
@Test
@SuppressWarnings("unchecked")
void shouldCreateTableSourceWithParams() {
@@ -168,7 +263,6 @@ class HttpLookupTableSourceTest {
@Override
public void open(CacheMetricGroup cacheMetricGroup) {}
- @Nullable
@Override
public Collection<RowData> getIfPresent(RowData rowData) {
return null;
@@ -252,7 +346,7 @@ class HttpLookupTableSourceTest {
private Map<String, String> getOptions() {
return Map.of(
"connector", "rest-lookup",
- "url", "http://localhost:" +
WireMockServerPortAllocator.PORT_BASE + "/service",
+ "url", "http://localhost:8080/service",
"format", "json");
}
}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpRowDataWrapperTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpRowDataWrapperTest.java
new file mode 100644
index 0000000..4948427
--- /dev/null
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpRowDataWrapperTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.http.table.lookup;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for HttpRowDataWrapper. */
+public class HttpRowDataWrapperTest {
+
+ @Test
+ void testshouldIgnore() {
+ HttpRowDataWrapper httpRowDataWrapper =
+ HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .httpCompletionState(HttpCompletionState.SUCCESS)
+ .build();
+ assertThat(httpRowDataWrapper.shouldIgnore()).isTrue();
+ httpRowDataWrapper =
+ HttpRowDataWrapper.builder()
+ .errorMessage("aa")
+ .httpCompletionState(HttpCompletionState.SUCCESS)
+ .build();
+ assertThat(httpRowDataWrapper.shouldIgnore()).isFalse();
+ httpRowDataWrapper =
+ HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .errorMessage("aa")
+ .httpCompletionState(HttpCompletionState.SUCCESS)
+ .build();
+ assertThat(httpRowDataWrapper.shouldIgnore()).isFalse();
+ httpRowDataWrapper =
+ HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .httpHeadersMap(new HashMap<>())
+ .httpCompletionState(HttpCompletionState.SUCCESS)
+ .build();
+ assertThat(httpRowDataWrapper.shouldIgnore()).isFalse();
+ httpRowDataWrapper =
+ HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .httpStatusCode(123)
+ .httpCompletionState(HttpCompletionState.SUCCESS)
+ .build();
+ assertThat(httpRowDataWrapper.shouldIgnore()).isFalse();
+ httpRowDataWrapper =
+ HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .httpCompletionState(HttpCompletionState.EXCEPTION)
+ .build();
+ assertThat(httpRowDataWrapper.shouldIgnore()).isFalse();
+ }
+}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
index ba1fa0c..91c793e 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
@@ -160,7 +160,7 @@ class JavaNetHttpPollingClientConnectionTest {
JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
- Collection<RowData> results = pollingClient.pull(lookupRowData);
+ Collection<RowData> results =
pollingClient.pull(lookupRowData).getData();
// THEN
wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest()));
@@ -188,7 +188,7 @@ class JavaNetHttpPollingClientConnectionTest {
setUpPollingClient(setUpBodyRequestFactory(methodName));
// WHEN
- Collection<RowData> results = pollingClient.pull(lookupRowData);
+ Collection<RowData> results =
pollingClient.pull(lookupRowData).getData();
// THEN
wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest()));
@@ -231,7 +231,7 @@ class JavaNetHttpPollingClientConnectionTest {
JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
- Collection<RowData> results = pollingClient.pull(lookupRowData);
+ Collection<RowData> results =
pollingClient.pull(lookupRowData).getData();
// THEN
wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest()));
@@ -265,7 +265,7 @@ class JavaNetHttpPollingClientConnectionTest {
JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
- Collection<RowData> results = pollingClient.pull(lookupRowData);
+ Collection<RowData> results =
pollingClient.pull(lookupRowData).getData();
// THEN
wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest()));
@@ -298,7 +298,7 @@ class JavaNetHttpPollingClientConnectionTest {
JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
- Collection<RowData> results = pollingClient.pull(lookupRowData);
+ Collection<RowData> results =
pollingClient.pull(lookupRowData).getData();
// THEN
assertThat(results.isEmpty()).isEqualTo(isExpectedResponseEmpty);
@@ -322,7 +322,7 @@ class JavaNetHttpPollingClientConnectionTest {
JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
- Collection<RowData> results = pollingClient.pull(null);
+ Collection<RowData> results = pollingClient.pull(null).getData();
// THEN
assertThat(results.isEmpty()).isTrue();
@@ -352,7 +352,7 @@ class JavaNetHttpPollingClientConnectionTest {
JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
- Collection<RowData> results = pollingClient.pull(lookupRowData);
+ Collection<RowData> results =
pollingClient.pull(lookupRowData).getData();
// THEN
wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest()));
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
index 13ce9cd..27b9f0e 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
@@ -267,7 +267,7 @@ public class JavaNetHttpPollingClientHttpsConnectionTest
extends HttpsConnection
private void testPollingClientConnection() throws ConfigurationException {
JavaNetHttpPollingClient pollingClient =
setUpPollingClient(properties);
- Collection<RowData> result = pollingClient.pull(lookupRowData);
+ Collection<RowData> result =
pollingClient.pull(lookupRowData).getData();
assertResult(result);
}