This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cdb985  [FLINK-38636] Support specifying HTTP version for lookups
7cdb985 is described below

commit 7cdb98506fbdf54130eaa48f83ad1716c3c9b447
Author: David Radley <[email protected]>
AuthorDate: Wed Nov 12 18:08:49 2025 +0000

    [FLINK-38636] Support specifying HTTP version for lookups
---
 CHANGELOG.md                                       | 285 ---------------------
 docs/content.zh/docs/connectors/table/http.md      |   3 +-
 docs/content/docs/connectors/table/http.md         |   3 +-
 .../http/config/HttpConnectorConfigConstants.java  |   3 +
 .../http/table/lookup/BodyBasedRequestFactory.java |   9 +-
 .../http/table/lookup/GetRequestFactory.java       |   8 +-
 .../table/lookup/HttpLookupConnectorOptions.java   |  13 +
 .../http/table/lookup/RequestFactoryBase.java      |  21 +-
 .../table/lookup/BodyBasedRequestFactoryTest.java  |  56 ++--
 .../Slf4JHttpLookupPostRequestCallbackTest.java    |  41 +++
 10 files changed, 127 insertions(+), 315 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
deleted file mode 100644
index b8a5c25..0000000
--- a/CHANGELOG.md
+++ /dev/null
@@ -1,285 +0,0 @@
-# Changelog
-
-## [Unreleased]
-
-## [0.20.0] - 2025-05-23
-
--   Added option to define a proxy for the lookup source (including 
authentication)
-
--   Added support for generic json and URL query creator 
-
--   Retries support for source table:
-    -   Auto retry on IOException and user-defined http codes - parameter 
`gid.connector.http.source.lookup.retry-codes`.
-    -   Parameters `gid.connector.http.source.lookup.error.code.exclude"` and 
`gid.connector.http.source.lookup.error.code` were replaced by 
`gid.connector.http.source.lookup.ignored-response-codes`.
-    -   Added connection timeout for source table - 
`gid.connector.http.source.lookup.connection.timeout`.
-
-## [0.19.0] - 2025-03-20
-
--   OIDC token request to not flow during explain
-
-## [0.18.0] - 2025-01-15
-
-### Fixed
-
--   Ignore Eclipse files in .gitignore
--   Support Flink 1.20
-
-## [0.17.0] - 2024-11-28
-
-### Added
-
--   Allow to fetch multiple results from REST API endpoint 
(`gid.connector.http.source.lookup.result-type`).
-
-## [0.16.0] - 2024-10-18
-
-### Added
-
--   Added support for built in JVM certificates if no security is configured.
--   Added support for OIDC Bearer tokens.
-
-### Fixed
-
--   Ensured SerializationSchema is used in thread-safe way.
-
-## [0.15.0] - 2024-07-30
-
-### Added
-
--   Added support for caching of lookup joins.
-
-### Fixed
-
--   Fixed issue in the logging code of the `JavaNetHttpPollingClient` which 
prevents showing the status code and response body when the log level is 
configured at DEBUG (or lower) level.
-
-## [0.14.0] - 2024-05-10
-
-### Added
-
--   Added support for optionally using a custom SLF4J logger to trace HTTP 
lookup queries.
-    New configuration parameter: 
`gid.connector.http.source.lookup.request-callback` with default value
-    `slf4j-lookup-logger`. If this parameter is not provided then the default 
SLF4J logger 
-    
[Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)
-    is used instead.
-
-## [0.13.0] - 2024-04-03
-
-### Added
-
--   Added support for using the result of a lookup join operation in a 
subsequent select query that adds
-    or removes columns (project pushdown operation).
-
-### Changed
-
--   Changed  
[LookupQueryInfo](src/main/java/com/getindata/connectors/http/internal/table/lookup/LookupQueryInfo.java)
-    Any custom implementation of this interface that aims to provide 
path-based requests is able to provide
-    the lookup query url with parameters surrounded by curly brackets. For 
example the supplied
-    URL `http://service/{customerId}`, will result in the lookup parameter 
`customerId` value being used
-    in the url.
-
-### Fixed
-
--   Moved junit support to junit 5, allowing junits to be run against flink 
1.17 and 1.18.
-
-## [0.12.0] - 2024-03-22
-
-### Added
-
--   Added support for passing `Authorization` headers for other purposes than 
Basic Authentication.
-    New configuration parameter: 
`gid.connector.http.source.lookup.use-raw-authorization-header`.
-    If set to `'true'`, the connector uses the raw value set for the 
`Authorization` header, without
-    transformation for Basic Authentication (base64, addition of "Basic " 
prefix).
-    If not specified, defaults to `'false'`.
-
-### Changed
-
--   Changed API for `LookupQueryCreator`. The method `createLookupQuery` no 
longer returns a String but a
-    
[LookupQueryInfo](src/main/java/com/getindata/connectors/http/internal/table/lookup/LookupQueryInfo.java)
-    Any custom implementation of this interface that aims to provide 
body-based request is able to provide
-    the lookup query as the payload and an optional formatted string 
representing the query parameters.
-
-## [0.11.0] - 2023-11-20
-
-## [0.10.0] - 2023-07-05
-
-### Fixed
-
-    Fixed an issue where SQL Client did not work with the connector at Flink 
1.16.
-
-    This required a change to use a different classloader in the lookup join 
processing. 
-    As well as the classloader change, a change to the PrefixedConfigOption 
implementation was
-    required, because it was implemented as an extension to ConfigOption; 
which produced  
-    access errors when trying to access the parent class protected methods 
(the parent class was loaded 
-    using a different classloader). The new implementation is not an 
extension; instead it holds an
-    instance of the ConfigOption as a private variable and uses reflection to 
instantiate a cloned 
-    ConfigOption object with the prefixed key. 
-
-### Added
-
--   Add support for batch request submission in HTTP sink. The mode can be 
changed by setting
-    `gid.connector.http.sink.writer.request.mode` with value `single` or 
`batch`. The default value
-    is `batch` bode which is breaking change comparing to previous versions. 
Additionally,
-    `gid.connector.http.sink.request.batch.size` option can be used to set 
batch size. By default,
-    batch size is 500 which is same as default value of HttpSink 
`maxBatchSize` parameter. 
-
-### Changed
-
--   Changed API for public HttpSink builder. The `setHttpPostRequestCallback` 
expects a `PostRequestCallback`
-    of generic type 
[HttpRequest](src/main/java/com/getindata/connectors/http/internal/sink/httpclient/HttpRequest.java)
-    instead `HttpSinkRequestEntry`.
--   Changed HTTP sink request and response processing thread pool sizes from 
16 to 1.
-
-## [0.9.0] - 2023-02-10
-
--   Add support for Flink 1.16.
--   Add 
[SchemaLifecycleAwareElementConverter](src/main/java/com/getindata/connectors/http/SchemaLifecycleAwareElementConverter.java)
 that can be used for createing
-    schema lifecycle aware Element converters for Http Sink.
-
-## [0.8.1] - 2022-12-22
-
-### Fixed
-
--   Fixed issue with not printing HttpRequest body/parameters for Lookup 
Source by
-    
[Slf4JHttpLookupPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)
 - <https://github.com/getindata/flink-http-connector/issues/45>
-
-### Removed
-
--   Removed unused reference to EncodingFormat from HttpLookupTableSource
-
-## [0.8.0] - 2022-12-06
-
-### Added
-
--   Add new parameters for HTTP timeout configuration and thread pool size for 
Sink and Lookup source http requests.
-
-### Fixed
-
--   Fix issue with not cleaning Flink's internal task queue for AsyncIO 
requests after HTTP timeout in
-    Lookup source - 
<https://github.com/getindata/flink-http-connector/issues/38>
-
-## [0.7.0] - 2022-10-27
-
--   Add to Lookup Source support for performing lookup on columns with complex 
types such as ROW, Map etc.
--   Add support for custom Json Serialization format for SQL Lookup Source 
when using 
[GenericJsonQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreator.java)
-    The custom format can be defined using Flink's Factory mechanism. The 
format name can be defined using
-    `lookup-request.format` option. The default format is `json` which means 
that connector will use FLink's 
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
 
-
-## [0.6.0] - 2022-10-05
-
-### Added
-
--   Add support for other REST methods like PUT and POST to lookup source 
connector. The request method can be set using
-    new optional lookup-source property `lookup-method`. If property is not 
specified in table DDL, GET method will be used for
-    lookup queries.
-
-## [0.5.0] - 2022-09-22
-
-### Added
-
--   Add Http Header value preprocessor mechanism, that can preprocess defined 
header value before setting it on the request.
--   Allow user to specify `Authorization` header for Basic Authentication. The 
value will be converted to Base64,
-    or if it starts from prefix `Basic `, it will be used as is (without any 
extra modification).
--   Add TLS and mTLS support for Http Sink and Lookup Source connectors.  
-    New properties are:
-    -   `gid.connector.http.security.cert.server` - path to server's 
certificate.
-    -   `gid.connector.http.security.cert.client` - path to connector's 
certificate.
-    -   `gid.connector.http.security.key.client` - path to connector's private 
key.
-    -   `gid.connector.http.security.cert.server.allowSelfSigned` - allowing 
for self-signed certificates without adding them to KeyStore (not recommended 
for a production).
--   Add 
[LookupQueryCreator](src/main/java/com/getindata/connectors/http/LookupQueryCreator.java)
 and
-    
[LookupQueryCreatorFactory](src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java)
 interfaces 
-    (along with a "default"
-    
[GenericGetQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreator.java)
-    implementation) for customization of queries prepared by Lookup Source for 
its HTTP requests.
--   Add 
[ElasticSearchLiteQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/ElasticSearchLiteQueryCreator.java)
-    that prepares [`q` parameter 
query](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-api-query-params-q)
-    using Lucene query string syntax (in first versions of ElasticSearch called
-    [Search 
_Lite_](https://www.elastic.co/guide/en/elasticsearch/guide/current/search-lite.html)).
-
-## [0.4.0] - 2022-08-31
-
-### Added
-
--   Add new properties 
`gid.connector.http.sink.error.code`,`gid.connector.http.sink.error.code.exclude`,
-    `gid.connector.http.source.lookup.error.code` and 
`gid.connector.http.source.lookup.error.code.exclude`
-    to set HTTP status codes that should be interpreted as errors both for 
HTTP Sink and HTTP Lookup Source.
--   Use Flink's format support to Http Lookup Source.
--   Add HTTP Lookup source client header configuration via properties.
--   Add 
[HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java)
 and
-    
[HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java)
-    interfaces (along with a "default"
-    
[Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java)
-    implementation) for customizable processing of HTTP Sink requests and 
responses in Table API.
-
-### Changed
-
--   Change dependency scope for `org.apache.flink.flink-connector-base` from 
`compile` to `provided`.
--   Changed DDL of `rest-lookup` connector. Dropped `json-path` properties, 
and add mandatory `format` property.
-
-### Removed
-
--   Remove dependency on `org.apache.httpcomponents.httpclient`from production 
code. Dependency is only for test scope.
--   Removed dependency on `com.jayway.jsonpath.json-path`
-
-### Fixed
-
--   Fix JavaDoc errors.
-
-## [0.3.0] - 2022-07-21
-
--   Package refactoring. Hide internal classes that does not have to be used 
by API users under "internal" package.
-    Methods defined in classes located outside "internal" package are 
considered "public API".
-    Any changes to those methods should be communicated as "not backward 
compatible" and should be avoided.
--   Add checkstyle configuration to "dev" folder. Add checkstyle check during 
maven build
--   Add HTTP sink client header configuration via properties.
-
-## [0.2.0] - 2022-07-06
-
--   Implement 
[HttpSink](src/main/java/com/getindata/connectors/http/HttpSink.java) deriving 
from 
[AsyncSinkBase](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink)
 introduced in Flink 1.15.
--   Add support for Table API in HttpSink in the form of 
[HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java).
 
-
-## [0.1.0] - 2022-05-26
-
--   Implement basic support for Http connector for Flink SQL
-
-[Unreleased]: 
https://github.com/getindata/flink-http-connector/compare/0.20.0...HEAD
-
-[0.20.0]: 
https://github.com/getindata/flink-http-connector/compare/0.19.0...0.20.0
-
-[0.19.0]: 
https://github.com/getindata/flink-http-connector/compare/0.18.0...0.19.0
-
-[0.18.0]: 
https://github.com/getindata/flink-http-connector/compare/0.17.0...0.18.0
-
-[0.17.0]: 
https://github.com/getindata/flink-http-connector/compare/0.16.0...0.17.0
-
-[0.16.0]: 
https://github.com/getindata/flink-http-connector/compare/0.15.0...0.16.0
-
-[0.15.0]: 
https://github.com/getindata/flink-http-connector/compare/0.14.0...0.15.0
-
-[0.14.0]: 
https://github.com/getindata/flink-http-connector/compare/0.13.0...0.14.0
-
-[0.13.0]: 
https://github.com/getindata/flink-http-connector/compare/0.12.0...0.13.0
-
-[0.12.0]: 
https://github.com/getindata/flink-http-connector/compare/0.11.0...0.12.0
-
-[0.11.0]: 
https://github.com/getindata/flink-http-connector/compare/0.10.0...0.11.0
-
-[0.10.0]: 
https://github.com/getindata/flink-http-connector/compare/0.9.0...0.10.0
-
-[0.9.0]: 
https://github.com/getindata/flink-http-connector/compare/0.8.1...0.9.0
-
-[0.8.1]: 
https://github.com/getindata/flink-http-connector/compare/0.8.0...0.8.1
-
-[0.8.0]: 
https://github.com/getindata/flink-http-connector/compare/0.7.0...0.8.0
-
-[0.7.0]: 
https://github.com/getindata/flink-http-connector/compare/0.6.0...0.7.0
-
-[0.6.0]: 
https://github.com/getindata/flink-http-connector/compare/0.5.0...0.6.0
-
-[0.5.0]: 
https://github.com/getindata/flink-http-connector/compare/0.4.0...0.5.0
-
-[0.4.0]: 
https://github.com/getindata/flink-http-connector/compare/0.3.0...0.4.0
-
-[0.3.0]: 
https://github.com/getindata/flink-http-connector/compare/0.2.0...0.3.0
-
-[0.2.0]: 
https://github.com/getindata/flink-http-connector/compare/0.1.0...0.2.0
-
-[0.1.0]: 
https://github.com/getindata/flink-http-connector/compare/dfe9bfeaa73e77b1de14cd0cb0546a925583e23e...0.1.0
diff --git a/docs/content.zh/docs/connectors/table/http.md 
b/docs/content.zh/docs/connectors/table/http.md
index c5f5348..2fd62b8 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -179,6 +179,7 @@ Note the options with the prefix _http_ are the HTTP 
connector specific options,
 | http.security.oidc.token.expiry.reduction                              | 
optional | OIDC tokens will be requested if the current time is later than the 
cached token expiry time minus this value.                                      
                                                                                
                                                                                
                                                                                
                   [...]
 | http.source.lookup.continue-on-error                                   | 
optional | When true, the flow will continue on errors, returning row content. 
When false (the default) the job ends on errors.                                
                                                                                
                                                                                
                                                                                
                   [...]
 | http.source.lookup.request.timeout                                     | 
optional | Sets HTTP request timeout in seconds. If not specified, the default 
value of 30 seconds will be used.                                               
                                                                                
                                                                                
                                                                                
                   [...]
+| http.source.lookup.http-version                                        | 
optional | Version of HTTP to use for lookup http requests. The valid values 
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option 
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1 
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and 
'HTTP/2 upgrade not supported'.                                                 
                      [...]
 | http.source.lookup.request.thread-pool.size                            | 
optional | Sets the size of pool thread for HTTP lookup request processing. 
Increasing this value would mean that more concurrent requests can be processed 
in the same time. If not specified, the default value of 8 threads will be 
used.                                                                           
                                                                                
                           [...]
 | http.source.lookup.response.thread-pool.size                           | 
optional | Sets the size of pool thread for HTTP lookup response processing. 
Increasing this value would mean that more concurrent requests can be processed 
in the same time. If not specified, the default value of 4 threads will be 
used.                                                                           
                                                                                
                          [...]
 | http.source.lookup.use-raw-authorization-header                        | 
optional | If set to `'true'`, uses the raw value set for the `Authorization` 
header, without transformation for Basic Authentication (base64, addition of 
"Basic " prefix). If not specified, defaults to `'false'`.                      
                                                                                
                                                                                
                       [...]
@@ -310,7 +311,7 @@ Notice that HTTP codes are categorized into into 3 groups:
 - error responses - unexpected responses are not retried. Any HTTP error code 
which is not configured as successful or temporary error is treated as an 
unretriable error.
 
 For temporary errors that have reached max retries attempts (per request) and 
error responses, the operation will
-succeed if `gid.connector.http.source.lookup.continue-on-error` is true, 
otherwise the job will fail.
+succeed if `http.source.lookup.continue-on-error` is true, otherwise the job 
will fail.
 
 ##### Retry strategy
 User can choose retry strategy type for source table:
diff --git a/docs/content/docs/connectors/table/http.md 
b/docs/content/docs/connectors/table/http.md
index c5f5348..2410a11 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -184,6 +184,7 @@ Note the options with the prefix _http_ are the HTTP 
connector specific options,
 | http.source.lookup.use-raw-authorization-header                        | 
optional | If set to `'true'`, uses the raw value set for the `Authorization` 
header, without transformation for Basic Authentication (base64, addition of 
"Basic " prefix). If not specified, defaults to `'false'`.                      
                                                                                
                                                                                
                       [...]
 | http.source.lookup.request-callback                                    | 
optional | Specify which `HttpLookupPostRequestCallback` implementation to use. 
By default, it is set to `slf4j-lookup-logger` corresponding to 
`Slf4jHttpLookupPostRequestCallback`.                                           
                                                                                
                                                                                
                                  [...]
 | http.source.lookup.connection.timeout                                  | 
optional | Source table connection timeout. Default - no value.                 
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| http.source.lookup.http-version                                        | 
optional | Version of HTTP to use for lookup http requests. The valid values 
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option 
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1 
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and 
'HTTP/2 upgrade not supported'.                                                 
                      [...]
 | http.source.lookup.success-codes                                       | 
optional | Comma separated http codes considered as success response. Use 
[1-5]XX for groups and '!' character for excluding.                             
                                                                                
                                                                                
                                                                                
                        [...]
 | http.source.lookup.retry-codes                                         | 
optional | Comma separated http codes considered as transient errors. Use 
[1-5]XX for groups and '!' character for excluding.                             
                                                                                
                                                                                
                                                                                
                        [...]
 | http.source.lookup.ignored-response-codes                              | 
optional | Comma separated http codes. Content for these responses will be 
ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored 
responses togater with `http.source.lookup.success-codes` are considered as 
successful.                                                                     
                                                                                
                                  [...]
@@ -310,7 +311,7 @@ Notice that HTTP codes are categorized into into 3 groups:
 - error responses - unexpected responses are not retried. Any HTTP error code 
which is not configured as successful or temporary error is treated as an 
unretriable error.
 
 For temporary errors that have reached max retries attempts (per request) and 
error responses, the operation will
-succeed if `gid.connector.http.source.lookup.continue-on-error` is true, 
otherwise the job will fail.
+succeed if `http.source.lookup.continue-on-error` is true, otherwise the job 
will fail.
 
 ##### Retry strategy
 User can choose retry strategy type for source table:
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
index 2715005..9fc3b70 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
@@ -56,6 +56,9 @@ public final class HttpConnectorConfigConstants {
 
     public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + 
"result-type";
 
+    public static final String SOURCE_LOOKUP_QUERY_HTTP_VERSION =
+            SOURCE_LOOKUP_PREFIX + "http-version";
+
     // --------- Error code handling configuration ---------
 
     // TODO copied from
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
index dab7a38..68fee55 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
@@ -29,7 +29,6 @@ import java.net.URISyntaxException;
 import java.net.http.HttpRequest;
 import java.net.http.HttpRequest.BodyPublishers;
 import java.net.http.HttpRequest.Builder;
-import java.time.Duration;
 
 /**
  * Implementation of {@link HttpRequestFactory} for REST calls that sends 
their parameters using
@@ -59,10 +58,10 @@ public class BodyBasedRequestFactory extends 
RequestFactoryBase {
      */
     @Override
     protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
-        return HttpRequest.newBuilder()
-                .uri(constructUri(lookupQueryInfo))
-                .method(methodName, 
BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()))
-                .timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
+        HttpRequest.Builder builder = 
super.setUpRequestMethod(lookupQueryInfo);
+        builder.uri(constructUri(lookupQueryInfo))
+                .method(methodName, 
BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()));
+        return builder;
     }
 
     @Override
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/GetRequestFactory.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/GetRequestFactory.java
index 694f2f5..76584f9 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/GetRequestFactory.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/GetRequestFactory.java
@@ -28,7 +28,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.http.HttpRequest;
 import java.net.http.HttpRequest.Builder;
-import java.time.Duration;
 
 /** Implementation of {@link HttpRequestFactory} for GET REST calls. */
 @Slf4j
@@ -67,10 +66,9 @@ public class GetRequestFactory extends RequestFactoryBase {
      */
     @Override
     protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
-        return HttpRequest.newBuilder()
-                .uri(constructGetUri(lookupQueryInfo))
-                .GET()
-                .timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
+        HttpRequest.Builder builder = 
super.setUpRequestMethod(lookupQueryInfo);
+        builder.uri(constructGetUri(lookupQueryInfo)).GET();
+        return builder;
     }
 
     URI constructGetUri(LookupQueryInfo lookupQueryInfo) {
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
index 305c3cf..aedf59d 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
@@ -31,6 +31,7 @@ import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstant
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_CONNECTION_TIMEOUT;
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_IGNORE_RESPONSE_CODES;
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER;
+import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_HTTP_VERSION;
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER;
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_PROXY_HOST;
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SOURCE_PROXY_PASSWORD;
@@ -74,6 +75,18 @@ public class HttpLookupConnectorOptions {
     public static final ConfigOption<String> LOOKUP_QUERY_CREATOR_IDENTIFIER =
             
ConfigOptions.key(SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER).stringType().noDefaultValue();
 
+    public static final ConfigOption<String> LOOKUP_HTTP_VERSION =
+            ConfigOptions.key(SOURCE_LOOKUP_QUERY_HTTP_VERSION)
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Version of HTTP to use for lookup HTTP requests. "
+                                    + "The valid values are HTTP_1_1 and 
HTTP_2, which specify HTTP 1.1 or 2"
+                                    + " respectively. This option may be 
required as HTTP_1_1, if the"
+                                    + " endpoint is http 1.1, because some 
http 1.1 endpoints reject HTTP"
+                                    + " Version 2 calls, with 'Invalid HTTP 
request received' and "
+                                    + " 'HTTP/2 upgrade not supported'.");
+
     public static final ConfigOption<String> LOOKUP_REQUEST_FORMAT =
             
ConfigOptions.key("lookup-request.format").stringType().defaultValue("json");
 
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
index 56ffbd1..52638f4 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
@@ -28,8 +28,10 @@ import org.apache.flink.util.FlinkRuntimeException;
 import lombok.extern.slf4j.Slf4j;
 import org.slf4j.Logger;
 
+import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
 import java.net.http.HttpRequest.Builder;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -52,6 +54,8 @@ public abstract class RequestFactoryBase implements 
HttpRequestFactory {
 
     private final HttpLookupConfig options;
 
+    private final HttpClient.Version httpVersion;
+
     public RequestFactoryBase(
             LookupQueryCreator lookupQueryCreator,
             HeaderPreprocessor headerPreprocessor,
@@ -83,6 +87,14 @@ public abstract class RequestFactoryBase implements 
HttpRequestFactory {
                                 .getProperty(
                                         
HttpConnectorConfigConstants.LOOKUP_HTTP_TIMEOUT_SECONDS,
                                         DEFAULT_REQUEST_TIMEOUT_SECONDS));
+
+        String httpVersionFromConfig =
+                
options.getReadableConfig().get(HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION);
+        if (httpVersionFromConfig == null) {
+            httpVersion = null;
+        } else {
+            httpVersion = HttpClient.Version.valueOf(httpVersionFromConfig);
+        }
     }
 
     @Override
@@ -102,12 +114,17 @@ public abstract class RequestFactoryBase implements 
HttpRequestFactory {
     protected abstract Logger getLogger();
 
     /**
-     * Method for preparing {@link Builder} for concrete REST method.
+     * Method for preparing {@link Builder} for REST method.
      *
      * @param lookupQuery lookup query used for request query parameters or 
body.
      * @return {@link Builder} for given lookupQuery.
      */
-    protected abstract Builder setUpRequestMethod(LookupQueryInfo lookupQuery);
+    protected Builder setUpRequestMethod(LookupQueryInfo lookupQuery) {
+        HttpRequest.Builder builder =
+                HttpRequest.newBuilder()
+                        
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
+        return httpVersion == null ? builder : builder.version(httpVersion);
+    }
 
     protected static StringBuilder resolvePathParameters(
             LookupQueryInfo lookupQueryInfo, StringBuilder resolvedUrl) {
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
index 2b3282d..be35a3e 100644
--- 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
@@ -18,16 +18,22 @@
 
 package org.apache.flink.connector.http.table.lookup;
 
+import org.apache.flink.configuration.Configuration;
+
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.net.URI;
+import java.net.http.HttpClient;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link BodyBasedRequestFactory}. */
@@ -36,22 +42,40 @@ public class BodyBasedRequestFactoryTest {
     @ParameterizedTest
     @MethodSource("configProvider")
     void testconstructUri(TestSpec testSpec) throws Exception {
-        LookupQueryInfo lookupQueryInfo =
-                new LookupQueryInfo(
-                        testSpec.url,
-                        testSpec.bodyBasedUrlQueryParams,
-                        testSpec.pathBasedUrlParams);
-        HttpLookupConfig httpLookupConfig =
-                HttpLookupConfig.builder()
-                        .lookupMethod(testSpec.lookupMethod)
-                        .url(testSpec.url)
-                        .useAsync(false)
-                        .build();
-        BodyBasedRequestFactory bodyBasedRequestFactory =
-                new BodyBasedRequestFactory("test", null, null, 
httpLookupConfig);
-
-        URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo);
-        assertThat(uri.toString()).isEqualTo(testSpec.expected);
+        Set<Configuration> configs = new HashSet();
+
+        Configuration configuration = new Configuration();
+        Configuration configurationHttp11 = new Configuration();
+        Configuration configurationHttp2 = new Configuration();
+
+        configurationHttp2.setString(
+                LOOKUP_HTTP_VERSION, 
String.valueOf(HttpClient.Version.HTTP_2));
+        configurationHttp11.setString(
+                LOOKUP_HTTP_VERSION, 
String.valueOf(HttpClient.Version.HTTP_1_1));
+
+        configs.add(configuration);
+        configs.add(configurationHttp11);
+        configs.add(configurationHttp2);
+
+        for (Configuration config : configs) {
+            LookupQueryInfo lookupQueryInfo =
+                    new LookupQueryInfo(
+                            testSpec.url,
+                            testSpec.bodyBasedUrlQueryParams,
+                            testSpec.pathBasedUrlParams);
+            HttpLookupConfig httpLookupConfig =
+                    HttpLookupConfig.builder()
+                            .lookupMethod(testSpec.lookupMethod)
+                            .url(testSpec.url)
+                            .useAsync(false)
+                            .readableConfig(config)
+                            .build();
+            BodyBasedRequestFactory bodyBasedRequestFactory =
+                    new BodyBasedRequestFactory("test", null, null, 
httpLookupConfig);
+
+            URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo);
+            assertThat(uri.toString()).isEqualTo(testSpec.expected);
+        }
     }
 
     private static class TestSpec {
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallbackTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallbackTest.java
new file mode 100644
index 0000000..4d05341
--- /dev/null
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallbackTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.http.table.lookup;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.http.HttpRequest;
+
+class Slf4JHttpLookupPostRequestCallbackTest {
+    @Test
+    public void testNullResponseDoesNotError() throws URISyntaxException {
+        HttpRequest httpRequest =
+                HttpRequest.newBuilder()
+                        .method("GET", 
HttpRequest.BodyPublishers.ofString("foo"))
+                        .uri(new URI("http://testing123";))
+                        .build();
+        HttpLookupSourceRequestEntry requestEntry =
+                new HttpLookupSourceRequestEntry(httpRequest, new 
LookupQueryInfo(""));
+        Slf4JHttpLookupPostRequestCallback slf4JHttpLookupPostRequestCallback =
+                new Slf4JHttpLookupPostRequestCallback();
+        slf4JHttpLookupPostRequestCallback.call(null, requestEntry, "aaa", 
null);
+    }
+}


Reply via email to