This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new fd9e218 [FLINK-38816] Use Flink 2 compatible deserialize in
`JavaNetHttpPollingClient`
fd9e218 is described below
commit fd9e218f1ccb7419cb5083b5de53f3f6124df33c
Author: David Radley <[email protected]>
AuthorDate: Mon Jan 12 12:43:50 2026 +0000
[FLINK-38816] Use Flink 2 compatible deserialize in
`JavaNetHttpPollingClient`
---
.gitignore | 1 +
.../table/lookup/JavaNetHttpPollingClient.java | 40 +++-
.../table/lookup/JavaNetHttpPollingClientTest.java | 224 +++++++++++++++++++++
3 files changed, 255 insertions(+), 10 deletions(-)
diff --git a/.gitignore b/.gitignore
index 649e1b7..05a0152 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,7 @@
.gitignore.swp
.project
.settings
+.metals
.DS_Store
/.java-version
.eslintcache
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
index dd9f60e..81260ed 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
@@ -326,22 +326,42 @@ public class JavaNetHttpPollingClient implements
PollingClient {
}
}
- private List<RowData> deserializeSingleValue(byte[] rawBytes) throws
IOException {
- return Optional.ofNullable(responseBodyDecoder.deserialize(rawBytes))
- .map(Collections::singletonList)
- .orElse(Collections.emptyList());
+ @VisibleForTesting
+ List<RowData> deserializeSingleValue(byte[] rawBytes) throws IOException {
+ List<RowData> result = new ArrayList<>();
+ responseBodyDecoder.deserialize(rawBytes, new ListCollector(result));
+ return Collections.unmodifiableList(result);
}
- private List<RowData> deserializeArray(byte[] rawBytes) throws IOException
{
+ private static class ListCollector implements
org.apache.flink.util.Collector<RowData> {
+ private final List<RowData> list;
+
+ ListCollector(List<RowData> list) {
+ this.list = list;
+ }
+
+ @Override
+ public void collect(RowData record) {
+ list.add(record);
+ }
+
+ @Override
+ public void close() {
+ // No-op
+ }
+ }
+
+ @VisibleForTesting
+ List<RowData> deserializeArray(byte[] rawBytes) throws IOException {
List<JsonNode> rawObjects = objectMapper.readValue(rawBytes, new
TypeReference<>() {});
List<RowData> result = new ArrayList<>();
for (JsonNode rawObject : rawObjects) {
if (!(rawObject instanceof NullNode)) {
- RowData deserialized =
-
responseBodyDecoder.deserialize(rawObject.toString().getBytes());
- // deserialize() returns null if deserialization fails
- if (deserialized != null) {
- result.add(deserialized);
+ List<RowData> deserialized =
+
deserializeSingleValue(rawObject.toString().getBytes());
+ // deserialize() may return empty list if deserialization fails
+ if (deserialized != null && !deserialized.isEmpty()) {
+ result.addAll(deserialized);
}
}
}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientTest.java
index 9eb7cf6..14689b5 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
import org.apache.flink.util.ConfigurationException;
import org.junit.jupiter.api.BeforeEach;
@@ -41,6 +42,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
@@ -234,4 +236,226 @@ public class JavaNetHttpPollingClientTest {
"no-cache, no-store, max-age=0, must-revalidate");
assertPropertyArray(headersAndValues, "Access-Control-Allow-Origin",
"*");
}
+
+ @Test
+ public void deserializeSingleValueTest() throws ConfigurationException,
IOException {
+ // GIVEN
+ DeserializationSchema<RowData> mockDecoder =
+ new DeserializationSchema<RowData>() {
+ @Override
+ public RowData deserialize(byte[] message) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<RowData>
out)
+ throws IOException {
+ String msg = new String(message);
+
out.collect(GenericRowData.of(StringData.fromString(msg)));
+ out.close();
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public
org.apache.flink.api.common.typeinfo.TypeInformation<RowData>
+ getProducedType() {
+ return null;
+ }
+ };
+
+ JavaNetHttpPollingClient client =
+ new JavaNetHttpPollingClient(
+ httpClient,
+ mockDecoder,
+ options,
+ new GetRequestFactory(
+ new GenericGetQueryCreator(lookupRow),
+ headerPreprocessor,
+ options));
+
+ // WHEN
+ String testString = "Test1";
+ List<RowData> result =
client.deserializeSingleValue(testString.getBytes());
+
+ // THEN
+ assertThat(result).hasSize(1);
+ assertThat(((StringData)
result.get(0).getString(0)).toString()).isEqualTo(testString);
+ }
+
+ @Test
+ public void shouldDeserializeArrayWithValidObjects() throws Exception {
+ // GIVEN
+ DeserializationSchema<RowData> mockDecoder =
+ new DeserializationSchema<RowData>() {
+ @Override
+ public RowData deserialize(byte[] message) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<RowData>
out)
+ throws IOException {
+ String msg = new String(message);
+ if (msg.contains("value1")) {
+
out.collect(GenericRowData.of(StringData.fromString("row1")));
+ } else if (msg.contains("value2")) {
+
out.collect(GenericRowData.of(StringData.fromString("row2")));
+ }
+ out.close();
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public
org.apache.flink.api.common.typeinfo.TypeInformation<RowData>
+ getProducedType() {
+ return null;
+ }
+ };
+
+ Properties properties = new Properties();
+ properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE,
"array");
+
+ HttpLookupConfig lookupConfig =
+
HttpLookupConfig.builder().url(BASE_URL).properties(properties).build();
+
+ JavaNetHttpPollingClient client =
+ new JavaNetHttpPollingClient(
+ httpClient,
+ mockDecoder,
+ lookupConfig,
+ new GetRequestFactory(
+ new GenericGetQueryCreator(lookupRow),
+ headerPreprocessor,
+ lookupConfig));
+
+ // WHEN
+ String jsonArray = "[{\"key\":\"value1\"},{\"key\":\"value2\"}]";
+ List<RowData> result = client.deserializeArray(jsonArray.getBytes());
+
+ // THEN
+ assertThat(result).isNotNull();
+ assertThat(result).hasSize(2);
+ }
+
+ @Test
+ public void shouldHandleNullNodesInArray() throws Exception {
+ // GIVEN
+ DeserializationSchema<RowData> mockDecoder =
+ new DeserializationSchema<RowData>() {
+ @Override
+ public RowData deserialize(byte[] message) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<RowData>
out)
+ throws IOException {
+
out.collect(GenericRowData.of(StringData.fromString("valid")));
+ out.close();
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public
org.apache.flink.api.common.typeinfo.TypeInformation<RowData>
+ getProducedType() {
+ return null;
+ }
+ };
+
+ Properties properties = new Properties();
+ properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE,
"array");
+
+ HttpLookupConfig lookupConfig =
+
HttpLookupConfig.builder().url(BASE_URL).properties(properties).build();
+
+ JavaNetHttpPollingClient client =
+ new JavaNetHttpPollingClient(
+ httpClient,
+ mockDecoder,
+ lookupConfig,
+ new GetRequestFactory(
+ new GenericGetQueryCreator(lookupRow),
+ headerPreprocessor,
+ lookupConfig));
+
+ // WHEN
+ String jsonArray = "[{\"key\":\"value1\"},null,{\"key\":\"value2\"}]";
+ List<RowData> result = client.deserializeArray(jsonArray.getBytes());
+
+ // THEN - null nodes should be skipped
+ assertThat(result).isNotNull();
+ assertThat(result).hasSize(2);
+ }
+
+ @Test
+ public void shouldHandleEmptyDeserializationInArray() throws Exception {
+ // GIVEN
+ DeserializationSchema<RowData> mockDecoder =
+ new DeserializationSchema<RowData>() {
+ @Override
+ public RowData deserialize(byte[] message) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<RowData>
out)
+ throws IOException {
+ String msg = new String(message);
+ // Only collect for specific messages, return empty
for others
+ if (msg.contains("\"status\":\"valid\"")) {
+
out.collect(GenericRowData.of(StringData.fromString("data")));
+ }
+ // Don't collect anything for other messages
+ out.close();
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public
org.apache.flink.api.common.typeinfo.TypeInformation<RowData>
+ getProducedType() {
+ return null;
+ }
+ };
+
+ Properties properties = new Properties();
+ properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE,
"array");
+
+ HttpLookupConfig lookupConfig =
+
HttpLookupConfig.builder().url(BASE_URL).properties(properties).build();
+
+ JavaNetHttpPollingClient client =
+ new JavaNetHttpPollingClient(
+ httpClient,
+ mockDecoder,
+ lookupConfig,
+ new GetRequestFactory(
+ new GenericGetQueryCreator(lookupRow),
+ headerPreprocessor,
+ lookupConfig));
+
+ // WHEN
+ String jsonArray =
+
"[{\"status\":\"invalid\"},{\"status\":\"valid\"},{\"status\":\"invalid\"}]";
+ List<RowData> result = client.deserializeArray(jsonArray.getBytes());
+
+ // THEN - only valid deserialization should be included
+ assertThat(result).isNotNull();
+ assertThat(result).hasSize(1);
+ }
}