This is an automated email from the ASF dual-hosted git repository.
ferenc-csaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new 04b06e9 [FLINK-39689] Make template placeholders consistent
04b06e9 is described below
commit 04b06e9cb034af50ab7edd5bd9dc0319084a7492
Author: David Radley <[email protected]>
AuthorDate: Mon May 18 16:40:35 2026 +0100
[FLINK-39689] Make template placeholders consistent
---
docs/content.zh/docs/connectors/table/http.md | 16 +++++------
docs/content/docs/connectors/table/http.md | 28 +++++++++----------
.../http/config/HttpConnectorConfigConstants.java | 6 +++++
.../http/table/lookup/RequestFactoryBase.java | 5 +++-
.../GenericJsonAndUrlQueryCreator.java | 31 ++++++++++++----------
.../GenericJsonAndUrlQueryCreatorFactory.java | 2 +-
.../table/lookup/BodyBasedRequestFactoryTest.java | 18 ++++++-------
.../lookup/HttpLookupTableSourceITCaseTest.java | 4 +--
.../http/table/lookup/LookupQueryInfoTest.java | 2 +-
.../GenericJsonAndUrlQueryCreatorTest.java | 25 +++++++++++++++++
10 files changed, 87 insertions(+), 50 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/http.md
b/docs/content.zh/docs/connectors/table/http.md
index 2713343..13d7a54 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -54,7 +54,7 @@ The HTTP source connector supports [Lookup
Joins](https://nightlies.apache.org/f
* [Timeouts](#timeouts)
* [Source table HTTP status code](#source-table-http-status-code)
* [Retries and handling errors (Lookup
source)](#retries-and-handling-errors-lookup-source)
- * [Retry strategy](#retry-strategy)
+ * [Retry strategy](#retry-strategy)
* [Lookup multiple results](#lookup-multiple-results)
* [Working with HTTP sink tables](#working-with-http-sink-tables)
* [HTTP Sink](#http-sink)
@@ -71,7 +71,7 @@ The HTTP source connector supports [Lookup
Joins](https://nightlies.apache.org/f
* [Basic Authentication](#basic-authentication)
* [OIDC Bearer Authentication](#oidc-bearer-authentication)
* [Logging the HTTP content](#logging-the-http-content)
- * [Restrictions at this time](#restrictions-at-this-time)
+ * [Restrictions at this time](#restrictions-at-this-time)
<!-- TOC -->
## Dependencies
@@ -87,9 +87,9 @@ The Flink connector does have some changes that you need to
be aware of if you a
* Existing java applications will need to be recompiled to pick up the new
flink package names.
* Existing application and SQL need to be amended to use the new connector
option names. The new option names do not have
- the _com.getindata.http_ prefix, the prefix is now _http_.
+the _com.getindata.http_ prefix, the prefix is now _http_.
* The name of the connector and the identifiers of components that are
discovered have been changed, so that the GetInData jar file can co-exist
- with this connector's jar file. Be aware that if you have created custom
pluggable components; you will need to recompile against this connector.
+with this connector's jar file. Be aware that if you have created custom
pluggable components; you will need to recompile against this connector.
* Note that the `http-generic-json-url` query creator now processes HTTP
bodies differently using `http.request.body-template`.
* Note that if you were incorrectly using
`gid.connector.http.request.query-param-fields` with POST or PUT did not give
an error. This connector corrects the behaviour so specifying
`http.request.query-param-fields` with POST or PUT does give an error.
* The GetInData HTTP connector was built against Flink version 1, so works
with that level of Flink and also Flink version 2. This connector is built
against and supports Flink 2.2.
@@ -142,7 +142,7 @@ CREATE TABLE Orders (
Then we can enrich the _Orders_ table with the _Customers_ HTTP table with the
following SQL:
```roomsql
-SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive,
c.details.nestedDetails.balance FROM Orders AS o
+SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive,
c.details.nestedDetails.balance FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id AND o.id2
= c.id2
```
@@ -249,8 +249,8 @@ parameters `http.request.body-template` and
`http.request.url-map`.
#### Mapping the URL
The `http.request.url-map` option provides a flexible way to map table columns
to parts of the URL, either URL segments or HTTP query parameters.
-Parses a string as a map of strings. For example if there are table columns
called `customerId` and `orderId`,
-then specifying value `customerId:cid,orderID:oid` and a url of
https://myendpoint/customers/{cid}?orders={oid} will mean that the url used for
the
+ Parses a string as a map of strings. For example if there are table columns
called `customerId` and `orderId`,
+then specifying value `customerId:cid,orderID:oid` and a url of
https://myendpoint/customers/{{cid}}?orders={{oid}} will mean that the url used
for the
lookup query will dynamically pickup the values for `customerId`, `orderId`
and use them in the url e.g. https://myendpoint/customers/cid1?orders=oid1.
The expected format of the map is: `key1:value1,key2:value2`.
@@ -281,7 +281,7 @@ CREATE TABLE CustomerLookup (
) WITH (
'connector' = 'http',
'format' = 'json',
- 'url' =
'http://api.example.com/lookup?customer={qp_customer}&order={qp_order}',
+ 'url' =
'http://api.example.com/lookup?customer={{qp_customer}}&order={{qp_order}}',
'lookup-method' = 'GET',
'http.request.url-map' = 'qp_customer:qp_customer,qp_order:qp_order'
)
diff --git a/docs/content/docs/connectors/table/http.md
b/docs/content/docs/connectors/table/http.md
index 95089cb..13d7a54 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -90,9 +90,9 @@ The Flink connector does have some changes that you need to
be aware of if you a
the _com.getindata.http_ prefix, the prefix is now _http_.
* The name of the connector and the identifiers of components that are
discovered have been changed, so that the GetInData jar file can co-exist
with this connector's jar file. Be aware that if you have created custom
pluggable components; you will need to recompile against this connector.
-* Note that the `http-generic-json-url` query creator now processes HTTP
bodies differently using `http.request.body-template`.
-* Note that if you were incorrectly using
`gid.connector.http.request.query-param-fields` with POST or PUT did not give
an error. This connector corrects the behaviour so specifying
`http.request.query-param-fields` with POST or PUT does give an error.
-* The GetInData HTTP connector was built against Flink version 1, so works
with that level of Flink and also Flink version 2. This connector is built
against and supports Flink 2.2.
+* Note that the `http-generic-json-url` query creator now processes HTTP
bodies differently using `http.request.body-template`.
+* Note that if you were incorrectly using
`gid.connector.http.request.query-param-fields` with POST or PUT did not give
an error. This connector corrects the behaviour so specifying
`http.request.query-param-fields` with POST or PUT does give an error.
+* The GetInData HTTP connector was built against Flink version 1, so works
with that level of Flink and also Flink version 2. This connector is built
against and supports Flink 2.2.
## Working with HTTP lookup source tables
@@ -142,7 +142,7 @@ CREATE TABLE Orders (
Then we can enrich the _Orders_ table with the _Customers_ HTTP table with the
following SQL:
```roomsql
-SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive,
c.details.nestedDetails.balance FROM Orders AS o
+SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive,
c.details.nestedDetails.balance FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id AND o.id2
= c.id2
```
@@ -249,14 +249,14 @@ parameters `http.request.body-template` and
`http.request.url-map`.
#### Mapping the URL
The `http.request.url-map` option provides a flexible way to map table columns
to parts of the URL, either URL segments or HTTP query parameters.
- Parses a string as a map of strings. For example if there are table columns
called `customerId` and `orderId`,
-then specifying value `customerId:cid,orderID:oid` and a url of
https://myendpoint/customers/{cid}?orders={oid} will mean that the url used for
the
-lookup query will dynamically pickup the values for `customerId`, `orderId`
and use them in the url e.g. https://myendpoint/customers/cid1?orders=oid1.
-The expected format of the map is: `key1:value1,key2:value2`.
+ Parses a string as a map of strings. For example if there are table columns
called `customerId` and `orderId`,
+then specifying value `customerId:cid,orderID:oid` and a url of
https://myendpoint/customers/{{cid}}?orders={{oid}} will mean that the url used
for the
+lookup query will dynamically pickup the values for `customerId`, `orderId`
and use them in the url e.g. https://myendpoint/customers/cid1?orders=oid1.
+The expected format of the map is: `key1:value1,key2:value2`.
As these values are being supplied as URL segments or part or query
parameters, the connector url encodes that content so characters like spaces
do not appear invalidly in the URL. In the case where the complete url is the
insert then url encoding is not performed; the url needs to be valid and already
-properly url encoded as appropriate.
+properly url encoded as appropriate.
**Example Scenario around clashing request and response columns:**
@@ -281,7 +281,7 @@ CREATE TABLE CustomerLookup (
) WITH (
'connector' = 'http',
'format' = 'json',
- 'url' =
'http://api.example.com/lookup?customer={qp_customer}&order={qp_order}',
+ 'url' =
'http://api.example.com/lookup?customer={{qp_customer}}&order={{qp_order}}',
'lookup-method' = 'GET',
'http.request.url-map' = 'qp_customer:qp_customer,qp_order:qp_order'
)
@@ -360,10 +360,10 @@ This query creator allows you to populate API calls very
flexibly. To do this ef
8) If you start from an OpenAPI specification that contains nested content
required as a lookup join key, then use `http.request.body-template` to map
top-level columns into that structure.
9) Response content is mapped to matching named top-level columns in the
lookup table. You should arrange your table columns so that some are request
columns (all top level) and some are response columns.
10) Use single quotes for the value of `http.request.body-template` so you do
not need to escape the double quotes, and add newline characters for
readability.
-11) If you want to enrich every event with the same API content, you can
specify a placeholder as the complete URL the `url`, then use
`http.request.url-map` to map it. In this scenario switching on caching is
advised to avoid repeated identical API calls.
+11) If you want to enrich every event with the same API content, you can
specify a placeholder as the complete URL the `url`, then use
`http.request.url-map` to map it. In this scenario switching on caching is
advised to avoid repeated identical API calls.
12) Note that columns in SQL tables (the DDL) do not have a natural way to
distinguish between request and response fields. Where possible, use the API
field name as column names in the DDL; this minimizes the number of columns you
need to define.
13) The exception to 12) is when a response API field name is the same as a
request API field **and** they have incompatible types. In this case, define
the request column with a different name, then use
`http.request.query-param-fields-with-key`, `http.request.body-template`,
and/or `http.request.url-map` to provide the mapping to the API field.
-14) Note the columns representing the response are those that should be used
for enrichment.
+14) Note the columns representing the response are those that should be used
for enrichment.
### Format considerations
@@ -654,8 +654,8 @@ Metadata columns can be specified and hold http
information. They are optional r
If the `error-string` metadata column is defined on the table and the call
succeeds then it will have a null value.
When the HTTP response cannot be deserialized, then the
`http-completion-state` will be `UNABLE_TO_DESERIALIZE_RESPONSE`
and the `error-string` will be the response body.
-When the HTTP status code is in the
`http.source.lookup.ignored-response-codes`, then the `http-completion-state`
will
-be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain
information about the API call that
+When the HTTP status code is in the
`http.source.lookup.ignored-response-codes`, then the `http-completion-state`
will
+be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain
information about the API call that
occurred.
When a HTTP lookup call fails and populates the metadata columns with the
error information, the expected enrichment columns from the HTTP call
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
index 6dfb414..c17e4a8 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
@@ -29,6 +29,12 @@ public final class HttpConnectorConfigConstants {
public static final String PROP_DELIM = ",";
+ /** Placeholder delimiter for template substitution (start). */
+ public static final String PLACEHOLDER_START = "{{";
+
+ /** Placeholder delimiter for template substitution (end). */
+ public static final String PLACEHOLDER_END = "}}";
+
/** A property prefix for http connector. */
public static final String FLINK_CONNECTOR_HTTP = "http.";
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
index 5379736..7d115c6 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
@@ -37,6 +37,9 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
+import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.PLACEHOLDER_END;
+import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.PLACEHOLDER_START;
+
/** Base class for {@link HttpRequest} factories. */
@Slf4j
public abstract class RequestFactoryBase implements HttpRequestFactory {
@@ -130,7 +133,7 @@ public abstract class RequestFactoryBase implements
HttpRequestFactory {
if (lookupQueryInfo.hasPathBasedUrlParameters()) {
for (Map.Entry<String, String> entry :
lookupQueryInfo.getPathBasedUrlParameters().entrySet()) {
- String pathParam = "{" + entry.getKey() + "}";
+ String pathParam = PLACEHOLDER_START + entry.getKey() +
PLACEHOLDER_END;
int startIndex = resolvedUrl.indexOf(pathParam);
if (startIndex == -1) {
throw new FlinkRuntimeException(
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
index b273dc5..318ddd9 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.http.LookupArg;
import org.apache.flink.connector.http.LookupQueryCreator;
+import org.apache.flink.connector.http.config.HttpConnectorConfigConstants;
import org.apache.flink.connector.http.table.lookup.LookupQueryInfo;
import org.apache.flink.connector.http.table.lookup.LookupRow;
import org.apache.flink.connector.http.utils.SerializationSchemaUtils;
@@ -51,23 +52,23 @@ import java.util.regex.Pattern;
* Generic JSON and URL query creator; in addition to be able to map columns
to json requests, it
* allows url inserts to be mapped to column names using templating. <br>
* <br>
- * For PUT and POST, parameters are mapped to the json body e.g. for the body
template "id1;id2" and
- * url of http://base. At lookup time with values of id1=1 and id2=2 as call
of http/base will be
- * issued with a json payload of {"id1":1,"id2":2} <br>
- * For all http methods, url segments and query parameters can be used to
include lookup up values.
- * Using the map from
<code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key of
- * the insert name and the value of the associated column. e.g. for <code>
- * GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP
- * </code> = "key1":"col1" and url of http://base/{key1}. At lookup time with
values of col1="aaaa"
- * a call of http/base/aaaa will be issued. For query parameters, the query
param should be supplied
- * in the URL with a place-holder that will be resolved using <code>
- * GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code>
+ * For PUT and POST, parameters are mapped to the json body e.g. for
REQUEST_PARAM_FIELDS =
+ * "id1;id2" and url of http://base. At lookup time with values of id1=1 and
id2=2 as call of
+ * http/base will be issued with a json payload of {"id1":1,"id2":2} <br>
+ * For all http methods, url segments can be used to include lookup up values.
Using the map from
+ * <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key
of the insert and the
+ * value of the associated column. e.g. for
<code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP
+ * </code> = "key1":"col1" and url of http://base/{{key1}}. At lookup time
with values of
+ * col1="aaaa" a call of http/base/aaaa will be issued.
*/
@Slf4j
public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
private static final long serialVersionUID = 1L;
private static final Pattern TEMPLATE_PLACEHOLDER_PATTERN =
- Pattern.compile("\\{\\{([^}]+)\\}\\}");
+ Pattern.compile(
+
Pattern.quote(HttpConnectorConfigConstants.PLACEHOLDER_START)
+ + "([^{}]+)"
+ +
Pattern.quote(HttpConnectorConfigConstants.PLACEHOLDER_END));
// not final so we can mutate for unit test
private SerializationSchema<RowData> serializationSchema;
@@ -164,8 +165,10 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
if (fieldValue == null) {
throw new IllegalArgumentException(
String.format(
- "Template placeholder {{%s}} references a
field that does not exist in the lookup row",
- fieldName));
+ "Template placeholder %s%s%s references a
field that does not exist in the lookup row",
+ HttpConnectorConfigConstants.PLACEHOLDER_START,
+ fieldName,
+ HttpConnectorConfigConstants.PLACEHOLDER_END));
}
String valueStr =
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
index d7b08db..bf20e9c 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
@@ -66,7 +66,7 @@ public class GenericJsonAndUrlQueryCreatorFactory implements
LookupQueryCreatorF
+ "<br>"
+ "For example if there are table columns
called customerId"
+ " and orderId, then specifying value
customerId:cid1,orderID:oid"
- + " and a url of
https://myendpoint/customers/{cid}/orders/{oid}"
+ + " and a url of
https://myendpoint/customers/{{cid}}/orders/{{oid}}"
+ " will mean that the url used for the
lookup query will"
+ " dynamically pickup the values for
customerId, orderId"
+ " and use them in the url."
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
index 056aff5..5e0d973 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
@@ -125,14 +125,14 @@ public class BodyBasedRequestFactoryTest {
new TestSpec(
null,
Map.of("param1", "value1"),
- "http://service/{param1}",
+ "http://service/{{param1}}",
lookupMethod,
"http://service/value1"),
// 2 path param
new TestSpec(
null,
Map.of("param1", "value1", "param2", "value2"),
- "http://service/{param1}/param2/{param2}",
+ "http://service/{{param1}}/param2/{{param2}}",
lookupMethod,
"http://service/value1/param2/value2"),
// 1 query param
@@ -160,35 +160,35 @@ public class BodyBasedRequestFactoryTest {
new TestSpec(
Map.of("param3", "value3", "param4", "value4"),
Map.of("param1", "value1", "param2", "value2"),
- "http://service/{param1}/param2/{param2}",
+ "http://service/{{param1}}/param2/{{param2}}",
lookupMethod,
"http://service/value1/param2/value2?param3=value3¶m4=value4"),
// URL encoding: path param with spaces
new TestSpec(
null,
Map.of("param1", "hello world"),
- "http://service/{param1}",
+ "http://service/{{param1}}",
lookupMethod,
"http://service/hello+world"),
// URL encoding: path param with special characters
new TestSpec(
null,
Map.of("param1", "[email protected]"),
- "http://service/{param1}",
+ "http://service/{{param1}}",
lookupMethod,
"http://service/user%40example.com"),
// URL encoding: path param with slash
new TestSpec(
null,
Map.of("param1", "path/to/resource"),
- "http://service/{param1}",
+ "http://service/{{param1}}",
lookupMethod,
"http://service/path%2Fto%2Fresource"),
// URL encoding: multiple path params with special characters
new TestSpec(
null,
Map.of("param1", "hello world", "param2",
"[email protected]"),
- "http://service/{param1}/users/{param2}",
+ "http://service/{{param1}}/users/{{param2}}",
lookupMethod,
"http://service/hello+world/users/user%40example.com"),
// URL encoding: query param with special characters (?, &, ;,
space)
@@ -209,7 +209,7 @@ public class BodyBasedRequestFactoryTest {
new TestSpec(
Map.of("query1", "value?test"),
Map.of("path1", "user@domain"),
- "http://service/{path1}",
+ "http://service/{{path1}}",
lookupMethod,
"http://service/user%40domain?query1=value%3Ftest"),
// Complete URL replacement with URL-encoded parts
@@ -218,7 +218,7 @@ public class BodyBasedRequestFactoryTest {
Map.of(
"url",
"https://api.example.com/search?q=hello%20world&filter=type%3Dbook&sort=date%3Adesc"),
- "{url}",
+ "{{url}}",
lookupMethod,
"https://api.example.com/search?q=hello%20world&filter=type%3Dbook&sort=date%3Adesc"));
}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
index 769303d..5497f1d 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
@@ -969,7 +969,7 @@ class HttpLookupTableSourceITCaseTest {
+ " `email` STRING\n"
+ ") WITH ("
+ "'connector' = 'http',"
- + "'url' = '{url}',"
+ + "'url' = '{{url}}',"
+ "'http.request.url-map' = 'url:url',"
+ "'format' = 'json',"
+ "'asyncPolling' = 'false',"
@@ -1235,7 +1235,7 @@ class HttpLookupTableSourceITCaseTest {
+ "'lookup-method' = 'GET',"
+ "'url' = 'http://localhost:"
+ serverPort2
- + "/client?customer={customer}&id2={id2}',"
+ + "/client?customer={{customer}}&id2={{id2}}',"
+ "'http.source.lookup.header.Content-Type' =
'application/json',"
+ "'asyncPolling' = 'true',"
+ "'http.source.lookup.query-creator' =
'http-generic-json-url',"
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
index c627746..0a7521f 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
@@ -68,7 +68,7 @@ class LookupQueryInfoTest {
Map<String, String> pathBasedUrlPathParameters = Map.of("key1",
"value1");
lookupQueryInfo =
- new LookupQueryInfo("http://service/{key1}", null,
pathBasedUrlPathParameters);
+ new LookupQueryInfo("http://service/{{key1}}", null,
pathBasedUrlPathParameters);
assertThat(lookupQueryInfo.hasLookupQuery()).isTrue();
assertThat(lookupQueryInfo.hasPathBasedUrlParameters()).isTrue();
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
index defb4ed..d54efd1 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
@@ -363,6 +363,31 @@ class GenericJsonAndUrlQueryCreatorTest {
.hasMessageContaining("does not exist");
}
+ @Test
+ public void testBodyTemplateWithInvalidNestedPlaceholders() {
+ // GIVEN - Body template with invalid nested placeholders like
{{aaa}}/{{bbb}}
+ Configuration config = new Configuration();
+ config.set(LOOKUP_METHOD, "POST");
+ config.set(REQUEST_BODY_TEMPLATE, "{\"path\":\"{{aaa}}/{{bbb}}\"}");
+
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ lookupRow.setLookupPhysicalRowDataType(DATATYPE_1);
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN/THEN - Should throw IllegalArgumentException because
{{aaa}}/{{bbb}} is not a valid
+ // field name
+ assertThatThrownBy(() -> creator.createLookupQuery(ROWDATA))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("does not exist");
+ }
+
@Test
public void testBodyTemplateWithComplexNestedStructureAndTimestamps()
throws Exception {
// GIVEN - Complex template with primitives, arrays, nested objects,
timestamps, and