ferenc-csaky commented on code in PR #36: URL: https://github.com/apache/flink-connector-http/pull/36#discussion_r2982364987
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java:
##########
@@ -178,6 +162,42 @@ public LookupQueryInfo createLookupQuery(final RowData
lookupDataRow) {
return new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParams,
pathBasedUrlParams);
}
+ /**
+ * Substitutes placeholders in the template with values from the JSON
object. Placeholders are
+ * in the format {@code <fieldName>} where fieldName is a top-level field
in the JSON object.
+ *
+ * @param template the template string with placeholders
+ * @param jsonObject the JSON object containing field values
+ * @return the template with placeholders replaced by actual values
+ */
+ private String substituteTemplate(String template, ObjectNode jsonObject) {
+ java.util.regex.Pattern pattern =
java.util.regex.Pattern.compile("<([^>]+)>");
+ java.util.regex.Matcher matcher = pattern.matcher(template);
+
+ StringBuffer result = new StringBuffer();
Review Comment:
Why `StringBuffer`? `StringBuilder` should be the default, cause it's
thread-safe, unless specific concurrency requirements.
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java:
##########
@@ -178,6 +162,42 @@ public LookupQueryInfo createLookupQuery(final RowData
lookupDataRow) {
return new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParams,
pathBasedUrlParams);
}
+ /**
+ * Substitutes placeholders in the template with values from the JSON
object. Placeholders are
+ * in the format {@code <fieldName>} where fieldName is a top-level field
in the JSON object.
+ *
+ * @param template the template string with placeholders
+ * @param jsonObject the JSON object containing field values
+ * @return the template with placeholders replaced by actual values
+ */
+ private String substituteTemplate(String template, ObjectNode jsonObject) {
+ java.util.regex.Pattern pattern =
java.util.regex.Pattern.compile("<([^>]+)>");
+ java.util.regex.Matcher matcher = pattern.matcher(template);
Review Comment:
nit: Let's import these regex classes.
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java:
##########
@@ -178,6 +162,42 @@ public LookupQueryInfo createLookupQuery(final RowData
lookupDataRow) {
return new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParams,
pathBasedUrlParams);
}
+ /**
+ * Substitutes placeholders in the template with values from the JSON
object. Placeholders are
+ * in the format {@code <fieldName>} where fieldName is a top-level field
in the JSON object.
+ *
+ * @param template the template string with placeholders
+ * @param jsonObject the JSON object containing field values
+ * @return the template with placeholders replaced by actual values
+ */
+ private String substituteTemplate(String template, ObjectNode jsonObject) {
Review Comment:
Any specific reason for the `<variableName>` placeholder syntax? AFAIK this
is not something that any widely used JSON templating engine uses, so if no
reason let's not create yet another syntax. I'd rather go with the
Mustache/Handlebars syntax: `{{variableName}}`, that's the most popular IMO.
##########
docs/content.zh/docs/connectors/table/http.md:
##########
@@ -163,50 +163,49 @@ Or for REST POST method they will be converted to Json
and used as request body.
Note the options with the prefix _http_ are the HTTP connector specific
options, the others are Flink options.
-| Option |
Required | Description/Value
|
-|:-----------------------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| connector |
required | The Value should be set to _http_
|
-| format |
required | Flink's format name that should be used to decode REST response, Use
`json` for a typical REST endpoint.
|
-| url |
required | The base URL that should be used for GET requests. For example
_http://localhost:8080/client_
|
-| asyncPolling |
optional | true/false - determines whether Async Polling should be used.
Mechanism is based on Flink's Async I/O.
|
-| lookup-method |
optional | GET/POST/PUT (and any other) - determines what REST method should be
used for lookup REST query. If not specified, `GET` method will be used.
|
-| lookup.cache |
optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the
lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching
entries on lookup operation in external API).
|
-| lookup.partial-cache.max-rows |
optional | The max number of rows of lookup cache, over this value, the oldest
rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this
option. See the following <a href="#lookup-cache">Lookup Cache</a> section for
more details.
|
-| lookup.partial-cache.expire-after-write |
optional | The max time to live for each rows in lookup cache after writing
into the cache. Specify as a
[Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration).
`lookup.cache` must be set to `PARTIAL` to use this option. See the following
<a href="#lookup-cache">Lookup Cache</a> section for more details.
|
-| lookup.partial-cache.expire-after-access |
optional | The max time to live for each rows in lookup cache after accessing
the entry in the cache. Specify as a
[Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration).
`lookup.cache` must be set to `PARTIAL` to use this option. See the following
<a href="#lookup-cache">Lookup Cache</a> section for more details.
|
-| lookup.partial-cache.cache-missing-key |
optional | This is a boolean that defaults to true. Whether to store an empty
value into the cache if the lookup key doesn't match any rows in the table.
`lookup.cache` must be set to `PARTIAL` to use this option. See the following
<a href="#lookup-cache">Lookup Cache</a> section for more details.
|
-| lookup.max-retries |
optional | The max retry times if the lookup failed; default is 3. See the
following <a href="#lookup-cache">Lookup Cache</a> section for more detail. Set
value 0 to disable retries.
|
-| http.security.cert.server |
optional | Comma separated paths to trusted HTTP server certificates that
should be added to the connectors trust store.
|
-| http.security.cert.client |
optional | Path to trusted certificate that should be used by connector's HTTP
client for mTLS communication.
|
-| http.security.key.client |
optional | Path to trusted private key that should be used by connector's HTTP
client for mTLS communication.
|
-| http.security.cert.server.allowSelfSigned |
optional | Accept untrusted certificates for TLS communication.
|
-| http.security.oidc.token.request |
optional | OIDC `Token Request` body in `application/x-www-form-urlencoded`
encoding
|
-| http.security.oidc.token.endpoint.url |
optional | OIDC `Token Endpoint` url, to which the token request will be issued
|
-| http.security.oidc.token.expiry.reduction |
optional | OIDC tokens will be requested if the current time is later than the
cached token expiry time minus this value.
|
-| http.source.lookup.continue-on-error |
optional | When true, the flow will continue on errors, returning row content.
When false (the default) the job ends on errors.
|
-| http.source.lookup.request.timeout |
optional | Sets HTTP request timeout in seconds. If not specified, the default
value of 30 seconds will be used.
|
-| http.source.lookup.request.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup request processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 8 threads will be
used.
|
-| http.source.lookup.response.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup response processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 4 threads will be
used.
|
-| http.source.lookup.use-raw-authorization-header |
optional | If set to `'true'`, uses the raw value set for the `Authorization`
header, without transformation for Basic Authentication (base64, addition of
"Basic " prefix). If not specified, defaults to `'false'`.
|
-| http.source.lookup.request-callback |
optional | Specify which `HttpLookupPostRequestCallback` implementation to use.
By default, it is set to `slf4j-lookup-logger` corresponding to
`Slf4jHttpLookupPostRequestCallback`.
|
-| http.source.lookup.connection.timeout |
optional | Source table connection timeout. Default - no value.
|
-| http.source.lookup.http-version |
optional | Version of HTTP to use for lookup http requests. The valid values
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and
'HTTP/2 upgrade not supported'.
|
-| http.source.lookup.success-codes |
optional | Comma separated http codes considered as success response. Use
[1-5]XX for groups and '!' character for excluding. The default is 2XX.
|
-| http.source.lookup.retry-codes |
optional | Comma separated http codes considered as transient errors. Use
[1-5]XX for groups and '!' character for excluding. The default is 500,503,504.
|
-| http.source.lookup.ignored-response-codes |
optional | Comma separated http codes. Content for these responses will be
ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored
responses together with `http.source.lookup.success-codes` are considered as
successful.
|
-| http.source.lookup.retry-strategy.type |
optional | Auto retry strategy type: fixed-delay (default) or
exponential-delay.
|
-| http.source.lookup.retry-strategy.fixed-delay.delay |
optional | Fixed-delay interval between retries. Default 1 second. Use
with`lookup.max-retries` parameter.
|
-| http.source.lookup.retry-strategy.exponential-delay.initial-backoff |
optional | Exponential-delay initial delay. Default 1 second.
|
-| http.source.lookup.retry-strategy.exponential-delay.max-backoff |
optional | Exponential-delay maximum delay. Default 1 minute. Use with
`lookup.max-retries` parameter.
|
-| http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier |
optional | Exponential-delay multiplier. Default value 1.5
|
-| http.source.lookup.proxy.host |
optional | Specify the hostname of the proxy.
|
-| http.source.lookup.proxy.port |
optional | Specify the port of the proxy.
|
-| http.source.lookup.proxy.username |
optional | Specify the username used for proxy authentication.
|
-| http.source.lookup.proxy.password |
optional | Specify the password used for proxy authentication.
|
-| http.request.query-param-fields |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The
names of the fields that will be mapped to query parameters. The parameters are
separated by semicolons, such as `param1;param2`.
|
-| http.request.body-fields |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The
names of the fields that will be mapped to the body. The parameters are
separated by semicolons, such as `param1;param2`.
|
|
-| http.request.additional-body-json |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator.
Additional JSON content to be merged into the request body for PUT and POST
operations. The value should be a valid JSON object string (e.g.,
`'{"opportunity":{"source":"flink"},"priority":1}'`) that will be parsed and
its fields merged at the top level with the generated request body. For
example, if the body is `{"id":123}` and additional-body-json is
`'{"extra":"value"}'`, the result will be `{"id":123,"extra":"value"}`.
Supports nested objects and arrays.
|
-| http.request.url-map |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The map
of insert names to column names used as url segments. Parses a string as a map
of strings. For example if there are table columns called `customerId` and
`orderId`, then specifying value `customerId:cid,orderID:oid` and a url of
https://myendpoint/customers/{cid}/orders/{oid} will mean that the url used for
the lookup query will dynamically pickup the values for `customerId`, `orderId`
and use them in the url e.g. https://myendpoint/customers/cid1/orders/oid1. The
expected format of the map is: `key1:value1,key2:value2`. |
+| Option |
Required | Description/Value
|
+|:-----------------------------------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| connector |
required | The Value should be set to _http_
|
+| format |
required | Flink's format name that should be used to decode REST response, Use
`json` for a typical REST endpoint.
|
+| url |
required | The base URL that should be used for GET requests. For example
_http://localhost:8080/client_
|
+| asyncPolling |
optional | true/false - determines whether Async Polling should be used.
Mechanism is based on Flink's Async I/O.
|
+| lookup-method |
optional | GET/POST/PUT (and any other) - determines what REST method should be
used for lookup REST query. If not specified, `GET` method will be used.
|
+| lookup.cache |
optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the
lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching
entries on lookup operation in external API).
|
+| lookup.partial-cache.max-rows |
optional | The max number of rows of lookup cache, over this value, the oldest
rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this
option. See the following <a href="#lookup-cache">Lookup Cache</a> section for
more details.
|
+| lookup.partial-cache.expire-after-write |
optional | The max time to live for each rows in lookup cache after writing
into the cache. Specify as a
[Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration).
`lookup.cache` must be set to `PARTIAL` to use this option. See the following
<a href="#lookup-cache">Lookup Cache</a> section for more details.
|
+| lookup.partial-cache.expire-after-access |
optional | The max time to live for each rows in lookup cache after accessing
the entry in the cache. Specify as a
[Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration).
`lookup.cache` must be set to `PARTIAL` to use this option. See the following
<a href="#lookup-cache">Lookup Cache</a> section for more details.
|
+| lookup.partial-cache.cache-missing-key |
optional | This is a boolean that defaults to true. Whether to store an empty
value into the cache if the lookup key doesn't match any rows in the table.
`lookup.cache` must be set to `PARTIAL` to use this option. See the following
<a href="#lookup-cache">Lookup Cache</a> section for more details.
|
+| lookup.max-retries |
optional | The max retry times if the lookup failed; default is 3. See the
following <a href="#lookup-cache">Lookup Cache</a> section for more detail. Set
value 0 to disable retries.
|
+| http.security.cert.server |
optional | Comma separated paths to trusted HTTP server certificates that
should be added to the connectors trust store.
|
+| http.security.cert.client |
optional | Path to trusted certificate that should be used by connector's HTTP
client for mTLS communication.
|
+| http.security.key.client |
optional | Path to trusted private key that should be used by connector's HTTP
client for mTLS communication.
|
+| http.security.cert.server.allowSelfSigned |
optional | Accept untrusted certificates for TLS communication.
|
+| http.security.oidc.token.request |
optional | OIDC `Token Request` body in `application/x-www-form-urlencoded`
encoding
|
+| http.security.oidc.token.endpoint.url |
optional | OIDC `Token Endpoint` url, to which the token request will be issued
|
+| http.security.oidc.token.expiry.reduction |
optional | OIDC tokens will be requested if the current time is later than the
cached token expiry time minus this value.
|
+| http.source.lookup.continue-on-error |
optional | When true, the flow will continue on errors, returning row content.
When false (the default) the job ends on errors.
|
+| http.source.lookup.request.timeout |
optional | Sets HTTP request timeout in seconds. If not specified, the default
value of 30 seconds will be used.
|
+| http.source.lookup.request.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup request processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 8 threads will be
used.
|
+| http.source.lookup.response.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup response processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 4 threads will be
used.
|
+| http.source.lookup.use-raw-authorization-header |
optional | If set to `'true'`, uses the raw value set for the `Authorization`
header, without transformation for Basic Authentication (base64, addition of
"Basic " prefix). If not specified, defaults to `'false'`.
|
+| http.source.lookup.request-callback |
optional | Specify which `HttpLookupPostRequestCallback` implementation to use.
By default, it is set to `slf4j-lookup-logger` corresponding to
`Slf4jHttpLookupPostRequestCallback`.
|
+| http.source.lookup.connection.timeout |
optional | Source table connection timeout. Default - no value.
|
+| http.source.lookup.http-version |
optional | Version of HTTP to use for lookup http requests. The valid values
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and
'HTTP/2 upgrade not supported'.
|
+| http.source.lookup.success-codes |
optional | Comma separated http codes considered as success response. Use
[1-5]XX for groups and '!' character for excluding. The default is 2XX.
|
+| http.source.lookup.retry-codes |
optional | Comma separated http codes considered as transient errors. Use
[1-5]XX for groups and '!' character for excluding. The default is 500,503,504.
|
+| http.source.lookup.ignored-response-codes |
optional | Comma separated http codes. Content for these responses will be
ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored
responses together with `http.source.lookup.success-codes` are considered as
successful.
|
+| http.source.lookup.retry-strategy.type |
optional | Auto retry strategy type: fixed-delay (default) or
exponential-delay.
|
+| http.source.lookup.retry-strategy.fixed-delay.delay |
optional | Fixed-delay interval between retries. Default 1 second. Use
with`lookup.max-retries` parameter.
|
+| http.source.lookup.retry-strategy.exponential-delay.initial-backoff |
optional | Exponential-delay initial delay. Default 1 second.
|
+| http.source.lookup.retry-strategy.exponential-delay.max-backoff |
optional | Exponential-delay maximum delay. Default 1 minute. Use with
`lookup.max-retries` parameter.
|
+| http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier |
optional | Exponential-delay multiplier. Default value 1.5
|
+| http.source.lookup.proxy.host |
optional | Specify the hostname of the proxy.
|
+| http.source.lookup.proxy.port |
optional | Specify the port of the proxy.
|
+| http.source.lookup.proxy.username |
optional | Specify the username used for proxy authentication.
|
+| http.source.lookup.proxy.password |
optional | Specify the password used for proxy authentication.
|
+| http.request.query-param-fields |
optional | Used for the `http-generic-json-url` query creator. The names of the
fields that will be mapped to query parameters. The parameters are separated by
semicolons, such as `param1;param2`.
|
+| http.request.body-template |
optional | Used for the `http-generic-json-url` query creator. A JSON template
string for constructing the request body for PUT and POST operations. Use
`<fieldName>` placeholders to reference top-level columns from the lookup
table. Supports creating complex nested JSON structures with both placeholders
and literal values. Example: `'{"user": {"id": <userId>, "name": <userName>},
"source": "flink"}'`. See the [Body Template](#body-template) section for
details and examples.
|
Review Comment:
Since we explicitly refer the detailed section here, I think we may remove
the example from here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
