This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new 7cdb985 [FLINK-38636] Support specifying HTTP version for lookups
7cdb985 is described below
commit 7cdb98506fbdf54130eaa48f83ad1716c3c9b447
Author: David Radley <[email protected]>
AuthorDate: Wed Nov 12 18:08:49 2025 +0000
[FLINK-38636] Support specifying HTTP version for lookups
---
CHANGELOG.md | 285 ---------------------
docs/content.zh/docs/connectors/table/http.md | 3 +-
docs/content/docs/connectors/table/http.md | 3 +-
.../http/config/HttpConnectorConfigConstants.java | 3 +
.../http/table/lookup/BodyBasedRequestFactory.java | 9 +-
.../http/table/lookup/GetRequestFactory.java | 8 +-
.../table/lookup/HttpLookupConnectorOptions.java | 13 +
.../http/table/lookup/RequestFactoryBase.java | 21 +-
.../table/lookup/BodyBasedRequestFactoryTest.java | 56 ++--
.../Slf4JHttpLookupPostRequestCallbackTest.java | 41 +++
10 files changed, 127 insertions(+), 315 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
deleted file mode 100644
index b8a5c25..0000000
--- a/CHANGELOG.md
+++ /dev/null
@@ -1,285 +0,0 @@
-# Changelog
-
-## [Unreleased]
-
-## [0.20.0] - 2025-05-23
-
-- Added option to define a proxy for the lookup source (including
authentication)
-
-- Added support for generic json and URL query creator
-
-- Retries support for source table:
- - Auto retry on IOException and user-defined http codes - parameter
`gid.connector.http.source.lookup.retry-codes`.
- - Parameters `gid.connector.http.source.lookup.error.code.exclude"` and
`gid.connector.http.source.lookup.error.code` were replaced by
`gid.connector.http.source.lookup.ignored-response-codes`.
- - Added connection timeout for source table -
`gid.connector.http.source.lookup.connection.timeout`.
-
-## [0.19.0] - 2025-03-20
-
-- OIDC token request to not flow during explain
-
-## [0.18.0] - 2025-01-15
-
-### Fixed
-
-- Ignore Eclipse files in .gitignore
-- Support Flink 1.20
-
-## [0.17.0] - 2024-11-28
-
-### Added
-
-- Allow to fetch multiple results from REST API endpoint
(`gid.connector.http.source.lookup.result-type`).
-
-## [0.16.0] - 2024-10-18
-
-### Added
-
-- Added support for built in JVM certificates if no security is configured.
-- Added support for OIDC Bearer tokens.
-
-### Fixed
-
-- Ensured SerializationSchema is used in thread-safe way.
-
-## [0.15.0] - 2024-07-30
-
-### Added
-
-- Added support for caching of lookup joins.
-
-### Fixed
-
-- Fixed issue in the logging code of the `JavaNetHttpPollingClient` which
prevents showing the status code and response body when the log level is
configured at DEBUG (or lower) level.
-
-## [0.14.0] - 2024-05-10
-
-### Added
-
-- Added support for optionally using a custom SLF4J logger to trace HTTP
lookup queries.
- New configuration parameter:
`gid.connector.http.source.lookup.request-callback` with default value
- `slf4j-lookup-logger`. If this parameter is not provided then the default
SLF4J logger
-
[Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)
- is used instead.
-
-## [0.13.0] - 2024-04-03
-
-### Added
-
-- Added support for using the result of a lookup join operation in a
subsequent select query that adds
- or removes columns (project pushdown operation).
-
-### Changed
-
-- Changed
[LookupQueryInfo](src/main/java/com/getindata/connectors/http/internal/table/lookup/LookupQueryInfo.java)
- Any custom implementation of this interface that aims to provide
path-based requests is able to provide
- the lookup query url with parameters surrounded by curly brackets. For
example the supplied
- URL `http://service/{customerId}`, will result in the lookup parameter
`customerId` value being used
- in the url.
-
-### Fixed
-
-- Moved junit support to junit 5, allowing junits to be run against flink
1.17 and 1.18.
-
-## [0.12.0] - 2024-03-22
-
-### Added
-
-- Added support for passing `Authorization` headers for other purposes than
Basic Authentication.
- New configuration parameter:
`gid.connector.http.source.lookup.use-raw-authorization-header`.
- If set to `'true'`, the connector uses the raw value set for the
`Authorization` header, without
- transformation for Basic Authentication (base64, addition of "Basic "
prefix).
- If not specified, defaults to `'false'`.
-
-### Changed
-
-- Changed API for `LookupQueryCreator`. The method `createLookupQuery` no
longer returns a String but a
-
[LookupQueryInfo](src/main/java/com/getindata/connectors/http/internal/table/lookup/LookupQueryInfo.java)
- Any custom implementation of this interface that aims to provide
body-based request is able to provide
- the lookup query as the payload and an optional formatted string
representing the query parameters.
-
-## [0.11.0] - 2023-11-20
-
-## [0.10.0] - 2023-07-05
-
-### Fixed
-
- Fixed an issue where SQL Client did not work with the connector at Flink
1.16.
-
- This required a change to use a different classloader in the lookup join
processing.
- As well as the classloader change, a change to the PrefixedConfigOption
implementation was
- required, because it was implemented as an extension to ConfigOption;
which produced
- access errors when trying to access the parent class protected methods
(the parent class was loaded
- using a different classloader). The new implementation is not an
extension; instead it holds an
- instance of the ConfigOption as a private variable and uses reflection to
instantiate a cloned
- ConfigOption object with the prefixed key.
-
-### Added
-
-- Add support for batch request submission in HTTP sink. The mode can be
changed by setting
- `gid.connector.http.sink.writer.request.mode` with value `single` or
`batch`. The default value
- is `batch` bode which is breaking change comparing to previous versions.
Additionally,
- `gid.connector.http.sink.request.batch.size` option can be used to set
batch size. By default,
- batch size is 500 which is same as default value of HttpSink
`maxBatchSize` parameter.
-
-### Changed
-
-- Changed API for public HttpSink builder. The `setHttpPostRequestCallback`
expects a `PostRequestCallback`
- of generic type
[HttpRequest](src/main/java/com/getindata/connectors/http/internal/sink/httpclient/HttpRequest.java)
- instead `HttpSinkRequestEntry`.
-- Changed HTTP sink request and response processing thread pool sizes from
16 to 1.
-
-## [0.9.0] - 2023-02-10
-
-- Add support for Flink 1.16.
-- Add
[SchemaLifecycleAwareElementConverter](src/main/java/com/getindata/connectors/http/SchemaLifecycleAwareElementConverter.java)
that can be used for createing
- schema lifecycle aware Element converters for Http Sink.
-
-## [0.8.1] - 2022-12-22
-
-### Fixed
-
-- Fixed issue with not printing HttpRequest body/parameters for Lookup
Source by
-
[Slf4JHttpLookupPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)
- <https://github.com/getindata/flink-http-connector/issues/45>
-
-### Removed
-
-- Removed unused reference to EncodingFormat from HttpLookupTableSource
-
-## [0.8.0] - 2022-12-06
-
-### Added
-
-- Add new parameters for HTTP timeout configuration and thread pool size for
Sink and Lookup source http requests.
-
-### Fixed
-
-- Fix issue with not cleaning Flink's internal task queue for AsyncIO
requests after HTTP timeout in
- Lookup source -
<https://github.com/getindata/flink-http-connector/issues/38>
-
-## [0.7.0] - 2022-10-27
-
-- Add to Lookup Source support for performing lookup on columns with complex
types such as ROW, Map etc.
-- Add support for custom Json Serialization format for SQL Lookup Source
when using
[GenericJsonQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreator.java)
- The custom format can be defined using Flink's Factory mechanism. The
format name can be defined using
- `lookup-request.format` option. The default format is `json` which means
that connector will use FLink's
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
-
-## [0.6.0] - 2022-10-05
-
-### Added
-
-- Add support for other REST methods like PUT and POST to lookup source
connector. The request method can be set using
- new optional lookup-source property `lookup-method`. If property is not
specified in table DDL, GET method will be used for
- lookup queries.
-
-## [0.5.0] - 2022-09-22
-
-### Added
-
-- Add Http Header value preprocessor mechanism, that can preprocess defined
header value before setting it on the request.
-- Allow user to specify `Authorization` header for Basic Authentication. The
value will be converted to Base64,
- or if it starts from prefix `Basic `, it will be used as is (without any
extra modification).
-- Add TLS and mTLS support for Http Sink and Lookup Source connectors.
- New properties are:
- - `gid.connector.http.security.cert.server` - path to server's
certificate.
- - `gid.connector.http.security.cert.client` - path to connector's
certificate.
- - `gid.connector.http.security.key.client` - path to connector's private
key.
- - `gid.connector.http.security.cert.server.allowSelfSigned` - allowing
for self-signed certificates without adding them to KeyStore (not recommended
for a production).
-- Add
[LookupQueryCreator](src/main/java/com/getindata/connectors/http/LookupQueryCreator.java)
and
-
[LookupQueryCreatorFactory](src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java)
interfaces
- (along with a "default"
-
[GenericGetQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreator.java)
- implementation) for customization of queries prepared by Lookup Source for
its HTTP requests.
-- Add
[ElasticSearchLiteQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/ElasticSearchLiteQueryCreator.java)
- that prepares [`q` parameter
query](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-api-query-params-q)
- using Lucene query string syntax (in first versions of ElasticSearch called
- [Search
_Lite_](https://www.elastic.co/guide/en/elasticsearch/guide/current/search-lite.html)).
-
-## [0.4.0] - 2022-08-31
-
-### Added
-
-- Add new properties
`gid.connector.http.sink.error.code`,`gid.connector.http.sink.error.code.exclude`,
- `gid.connector.http.source.lookup.error.code` and
`gid.connector.http.source.lookup.error.code.exclude`
- to set HTTP status codes that should be interpreted as errors both for
HTTP Sink and HTTP Lookup Source.
-- Use Flink's format support to Http Lookup Source.
-- Add HTTP Lookup source client header configuration via properties.
-- Add
[HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java)
and
-
[HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java)
- interfaces (along with a "default"
-
[Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java)
- implementation) for customizable processing of HTTP Sink requests and
responses in Table API.
-
-### Changed
-
-- Change dependency scope for `org.apache.flink.flink-connector-base` from
`compile` to `provided`.
-- Changed DDL of `rest-lookup` connector. Dropped `json-path` properties,
and add mandatory `format` property.
-
-### Removed
-
-- Remove dependency on `org.apache.httpcomponents.httpclient`from production
code. Dependency is only for test scope.
-- Removed dependency on `com.jayway.jsonpath.json-path`
-
-### Fixed
-
-- Fix JavaDoc errors.
-
-## [0.3.0] - 2022-07-21
-
-- Package refactoring. Hide internal classes that does not have to be used
by API users under "internal" package.
- Methods defined in classes located outside "internal" package are
considered "public API".
- Any changes to those methods should be communicated as "not backward
compatible" and should be avoided.
-- Add checkstyle configuration to "dev" folder. Add checkstyle check during
maven build
-- Add HTTP sink client header configuration via properties.
-
-## [0.2.0] - 2022-07-06
-
-- Implement
[HttpSink](src/main/java/com/getindata/connectors/http/HttpSink.java) deriving
from
[AsyncSinkBase](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink)
introduced in Flink 1.15.
-- Add support for Table API in HttpSink in the form of
[HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java).
-
-## [0.1.0] - 2022-05-26
-
-- Implement basic support for Http connector for Flink SQL
-
-[Unreleased]:
https://github.com/getindata/flink-http-connector/compare/0.20.0...HEAD
-
-[0.20.0]:
https://github.com/getindata/flink-http-connector/compare/0.19.0...0.20.0
-
-[0.19.0]:
https://github.com/getindata/flink-http-connector/compare/0.18.0...0.19.0
-
-[0.18.0]:
https://github.com/getindata/flink-http-connector/compare/0.17.0...0.18.0
-
-[0.17.0]:
https://github.com/getindata/flink-http-connector/compare/0.16.0...0.17.0
-
-[0.16.0]:
https://github.com/getindata/flink-http-connector/compare/0.15.0...0.16.0
-
-[0.15.0]:
https://github.com/getindata/flink-http-connector/compare/0.14.0...0.15.0
-
-[0.14.0]:
https://github.com/getindata/flink-http-connector/compare/0.13.0...0.14.0
-
-[0.13.0]:
https://github.com/getindata/flink-http-connector/compare/0.12.0...0.13.0
-
-[0.12.0]:
https://github.com/getindata/flink-http-connector/compare/0.11.0...0.12.0
-
-[0.11.0]:
https://github.com/getindata/flink-http-connector/compare/0.10.0...0.11.0
-
-[0.10.0]:
https://github.com/getindata/flink-http-connector/compare/0.9.0...0.10.0
-
-[0.9.0]:
https://github.com/getindata/flink-http-connector/compare/0.8.1...0.9.0
-
-[0.8.1]:
https://github.com/getindata/flink-http-connector/compare/0.8.0...0.8.1
-
-[0.8.0]:
https://github.com/getindata/flink-http-connector/compare/0.7.0...0.8.0
-
-[0.7.0]:
https://github.com/getindata/flink-http-connector/compare/0.6.0...0.7.0
-
-[0.6.0]:
https://github.com/getindata/flink-http-connector/compare/0.5.0...0.6.0
-
-[0.5.0]:
https://github.com/getindata/flink-http-connector/compare/0.4.0...0.5.0
-
-[0.4.0]:
https://github.com/getindata/flink-http-connector/compare/0.3.0...0.4.0
-
-[0.3.0]:
https://github.com/getindata/flink-http-connector/compare/0.2.0...0.3.0
-
-[0.2.0]:
https://github.com/getindata/flink-http-connector/compare/0.1.0...0.2.0
-
-[0.1.0]:
https://github.com/getindata/flink-http-connector/compare/dfe9bfeaa73e77b1de14cd0cb0546a925583e23e...0.1.0
diff --git a/docs/content.zh/docs/connectors/table/http.md
b/docs/content.zh/docs/connectors/table/http.md
index c5f5348..2fd62b8 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -179,6 +179,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.security.oidc.token.expiry.reduction |
optional | OIDC tokens will be requested if the current time is later than the
cached token expiry time minus this value.
[...]
| http.source.lookup.continue-on-error |
optional | When true, the flow will continue on errors, returning row content.
When false (the default) the job ends on errors.
[...]
| http.source.lookup.request.timeout |
optional | Sets HTTP request timeout in seconds. If not specified, the default
value of 30 seconds will be used.
[...]
+| http.source.lookup.http-version |
optional | Version of HTTP to use for lookup http requests. The valid values
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and
'HTTP/2 upgrade not supported'.
[...]
| http.source.lookup.request.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup request processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 8 threads will be
used.
[...]
| http.source.lookup.response.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup response processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 4 threads will be
used.
[...]
| http.source.lookup.use-raw-authorization-header |
optional | If set to `'true'`, uses the raw value set for the `Authorization`
header, without transformation for Basic Authentication (base64, addition of
"Basic " prefix). If not specified, defaults to `'false'`.
[...]
@@ -310,7 +311,7 @@ Notice that HTTP codes are categorized into into 3 groups:
- error responses - unexpected responses are not retried. Any HTTP error code
which is not configured as successful or temporary error is treated as an
unretriable error.
For temporary errors that have reached max retries attempts (per request) and
error responses, the operation will
-succeed if `gid.connector.http.source.lookup.continue-on-error` is true,
otherwise the job will fail.
+succeed if `http.source.lookup.continue-on-error` is true, otherwise the job
will fail.
##### Retry strategy
User can choose retry strategy type for source table:
diff --git a/docs/content/docs/connectors/table/http.md
b/docs/content/docs/connectors/table/http.md
index c5f5348..2410a11 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -184,6 +184,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.source.lookup.use-raw-authorization-header |
optional | If set to `'true'`, uses the raw value set for the `Authorization`
header, without transformation for Basic Authentication (base64, addition of
"Basic " prefix). If not specified, defaults to `'false'`.
[...]
| http.source.lookup.request-callback |
optional | Specify which `HttpLookupPostRequestCallback` implementation to use.
By default, it is set to `slf4j-lookup-logger` corresponding to
`Slf4jHttpLookupPostRequestCallback`.
[...]
| http.source.lookup.connection.timeout |
optional | Source table connection timeout. Default - no value.
[...]
+| http.source.lookup.http-version |
optional | Version of HTTP to use for lookup http requests. The valid values
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and
'HTTP/2 upgrade not supported'.
[...]
| http.source.lookup.success-codes |
optional | Comma separated http codes considered as success response. Use
[1-5]XX for groups and '!' character for excluding.
[...]
| http.source.lookup.retry-codes |
optional | Comma separated http codes considered as transient errors. Use
[1-5]XX for groups and '!' character for excluding.
[...]
| http.source.lookup.ignored-response-codes |
optional | Comma separated http codes. Content for these responses will be
ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored
responses togater with `http.source.lookup.success-codes` are considered as
successful.
[...]
@@ -310,7 +311,7 @@ Notice that HTTP codes are categorized into into 3 groups:
- error responses - unexpected responses are not retried. Any HTTP error code
which is not configured as successful or temporary error is treated as an
unretriable error.
For temporary errors that have reached max retries attempts (per request) and
error responses, the operation will
-succeed if `gid.connector.http.source.lookup.continue-on-error` is true,
otherwise the job will fail.
+succeed if `http.source.lookup.continue-on-error` is true, otherwise the job
will fail.
##### Retry strategy
User can choose retry strategy type for source table:
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
index 2715005..9fc3b70 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
@@ -56,6 +56,9 @@ public final class HttpConnectorConfigConstants {
public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX +
"result-type";
+ public static final String SOURCE_LOOKUP_QUERY_HTTP_VERSION =
+ SOURCE_LOOKUP_PREFIX + "http-version";
+
// --------- Error code handling configuration ---------
// TODO copied from
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
index dab7a38..68fee55 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
@@ -29,7 +29,6 @@ import java.net.URISyntaxException;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
-import java.time.Duration;
/**
* Implementation of {@link HttpRequestFactory} for REST calls that sends
their parameters using
@@ -59,10 +58,10 @@ public class BodyBasedRequestFactory extends
RequestFactoryBase {
*/
@Override
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
- return HttpRequest.newBuilder()
- .uri(constructUri(lookupQueryInfo))
- .method(methodName,
BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()))
- .timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
+ HttpRequest.Builder builder =
super.setUpRequestMethod(lookupQueryInfo);
+ builder.uri(constructUri(lookupQueryInfo))
+ .method(methodName,
BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()));
+ return builder;
}
@Override
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/GetRequestFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/GetRequestFactory.java
index 694f2f5..76584f9 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/GetRequestFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/GetRequestFactory.java
@@ -28,7 +28,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.Builder;
-import java.time.Duration;
/** Implementation of {@link HttpRequestFactory} for GET REST calls. */
@Slf4j
@@ -67,10 +66,9 @@ public class GetRequestFactory extends RequestFactoryBase {
*/
@Override
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
- return HttpRequest.newBuilder()
- .uri(constructGetUri(lookupQueryInfo))
- .GET()
- .timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
+ HttpRequest.Builder builder =
super.setUpRequestMethod(lookupQueryInfo);
+ builder.uri(constructGetUri(lookupQueryInfo)).GET();
+ return builder;
}
URI constructGetUri(LookupQueryInfo lookupQueryInfo) {
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
index 305c3cf..aedf59d 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
@@ -31,6 +31,7 @@ import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstant
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_CONNECTION_TIMEOUT;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_IGNORE_RESPONSE_CODES;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER;
+import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_HTTP_VERSION;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_PROXY_HOST;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_PROXY_PASSWORD;
@@ -74,6 +75,18 @@ public class HttpLookupConnectorOptions {
public static final ConfigOption<String> LOOKUP_QUERY_CREATOR_IDENTIFIER =
ConfigOptions.key(SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER).stringType().noDefaultValue();
+ public static final ConfigOption<String> LOOKUP_HTTP_VERSION =
+ ConfigOptions.key(SOURCE_LOOKUP_QUERY_HTTP_VERSION)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Version of HTTP to use for lookup HTTP requests. "
+ + "The valid values are HTTP_1_1 and
HTTP_2, which specify HTTP 1.1 or 2"
+ + " respectively. This option may be
required as HTTP_1_1, if the"
+ + " endpoint is http 1.1, because some
http 1.1 endpoints reject HTTP"
+ + " Version 2 calls, with 'Invalid HTTP
request received' and "
+ + " 'HTTP/2 upgrade not supported'.");
+
public static final ConfigOption<String> LOOKUP_REQUEST_FORMAT =
ConfigOptions.key("lookup-request.format").stringType().defaultValue("json");
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
index 56ffbd1..52638f4 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
@@ -28,8 +28,10 @@ import org.apache.flink.util.FlinkRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
+import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.Builder;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
@@ -52,6 +54,8 @@ public abstract class RequestFactoryBase implements
HttpRequestFactory {
private final HttpLookupConfig options;
+ private final HttpClient.Version httpVersion;
+
public RequestFactoryBase(
LookupQueryCreator lookupQueryCreator,
HeaderPreprocessor headerPreprocessor,
@@ -83,6 +87,14 @@ public abstract class RequestFactoryBase implements
HttpRequestFactory {
.getProperty(
HttpConnectorConfigConstants.LOOKUP_HTTP_TIMEOUT_SECONDS,
DEFAULT_REQUEST_TIMEOUT_SECONDS));
+
+ String httpVersionFromConfig =
+
options.getReadableConfig().get(HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION);
+ if (httpVersionFromConfig == null) {
+ httpVersion = null;
+ } else {
+ httpVersion = HttpClient.Version.valueOf(httpVersionFromConfig);
+ }
}
@Override
@@ -102,12 +114,17 @@ public abstract class RequestFactoryBase implements
HttpRequestFactory {
protected abstract Logger getLogger();
/**
- * Method for preparing {@link Builder} for concrete REST method.
+ * Method for preparing {@link Builder} for REST method.
*
* @param lookupQuery lookup query used for request query parameters or
body.
* @return {@link Builder} for given lookupQuery.
*/
- protected abstract Builder setUpRequestMethod(LookupQueryInfo lookupQuery);
+ protected Builder setUpRequestMethod(LookupQueryInfo lookupQuery) {
+ HttpRequest.Builder builder =
+ HttpRequest.newBuilder()
+
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
+ return httpVersion == null ? builder : builder.version(httpVersion);
+ }
protected static StringBuilder resolvePathParameters(
LookupQueryInfo lookupQueryInfo, StringBuilder resolvedUrl) {
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
index 2b3282d..be35a3e 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
@@ -18,16 +18,22 @@
package org.apache.flink.connector.http.table.lookup;
+import org.apache.flink.configuration.Configuration;
+
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.net.URI;
+import java.net.http.HttpClient;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link BodyBasedRequestFactory}. */
@@ -36,22 +42,40 @@ public class BodyBasedRequestFactoryTest {
@ParameterizedTest
@MethodSource("configProvider")
void testconstructUri(TestSpec testSpec) throws Exception {
- LookupQueryInfo lookupQueryInfo =
- new LookupQueryInfo(
- testSpec.url,
- testSpec.bodyBasedUrlQueryParams,
- testSpec.pathBasedUrlParams);
- HttpLookupConfig httpLookupConfig =
- HttpLookupConfig.builder()
- .lookupMethod(testSpec.lookupMethod)
- .url(testSpec.url)
- .useAsync(false)
- .build();
- BodyBasedRequestFactory bodyBasedRequestFactory =
- new BodyBasedRequestFactory("test", null, null,
httpLookupConfig);
-
- URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo);
- assertThat(uri.toString()).isEqualTo(testSpec.expected);
+ Set<Configuration> configs = new HashSet();
+
+ Configuration configuration = new Configuration();
+ Configuration configurationHttp11 = new Configuration();
+ Configuration configurationHttp2 = new Configuration();
+
+ configurationHttp2.setString(
+ LOOKUP_HTTP_VERSION,
String.valueOf(HttpClient.Version.HTTP_2));
+ configurationHttp11.setString(
+ LOOKUP_HTTP_VERSION,
String.valueOf(HttpClient.Version.HTTP_1_1));
+
+ configs.add(configuration);
+ configs.add(configurationHttp11);
+ configs.add(configurationHttp2);
+
+ for (Configuration config : configs) {
+ LookupQueryInfo lookupQueryInfo =
+ new LookupQueryInfo(
+ testSpec.url,
+ testSpec.bodyBasedUrlQueryParams,
+ testSpec.pathBasedUrlParams);
+ HttpLookupConfig httpLookupConfig =
+ HttpLookupConfig.builder()
+ .lookupMethod(testSpec.lookupMethod)
+ .url(testSpec.url)
+ .useAsync(false)
+ .readableConfig(config)
+ .build();
+ BodyBasedRequestFactory bodyBasedRequestFactory =
+ new BodyBasedRequestFactory("test", null, null,
httpLookupConfig);
+
+ URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo);
+ assertThat(uri.toString()).isEqualTo(testSpec.expected);
+ }
}
private static class TestSpec {
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallbackTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallbackTest.java
new file mode 100644
index 0000000..4d05341
--- /dev/null
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallbackTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.http.table.lookup;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.http.HttpRequest;
+
+class Slf4JHttpLookupPostRequestCallbackTest {
+ @Test
+ public void testNullResponseDoesNotError() throws URISyntaxException {
+ HttpRequest httpRequest =
+ HttpRequest.newBuilder()
+ .method("GET",
HttpRequest.BodyPublishers.ofString("foo"))
+ .uri(new URI("http://testing123"))
+ .build();
+ HttpLookupSourceRequestEntry requestEntry =
+ new HttpLookupSourceRequestEntry(httpRequest, new
LookupQueryInfo(""));
+ Slf4JHttpLookupPostRequestCallback slf4JHttpLookupPostRequestCallback =
+ new Slf4JHttpLookupPostRequestCallback();
+ slf4JHttpLookupPostRequestCallback.call(null, requestEntry, "aaa",
null);
+ }
+}