This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 444732e25ab [FLINK-18590][json] Support json array explode to multi
messages (#26473)
444732e25ab is described below
commit 444732e25ab87a1d0e2835049b6f1513f7b3df4a
Author: Hongjia Liang <[email protected]>
AuthorDate: Tue Apr 29 09:45:25 2025 +0800
[FLINK-18590][json] Support json array explode to multi messages (#26473)
* [FLINK-18590][json] Support json array explode to multi messages
---------
Co-authored-by: Benchao Li <[email protected]>
---
.../docs/connectors/table/formats/json.md | 31 ++++++++++
docs/content/docs/connectors/table/formats/json.md | 29 +++++++++
.../json/AbstractJsonDeserializationSchema.java | 35 +++++++++++
.../JsonParserRowDataDeserializationSchema.java | 31 +++++++---
.../json/JsonRowDataDeserializationSchema.java | 36 ++++++++---
.../formats/json/JsonRowDataSerDeSchemaTest.java | 72 ++++++++++++++++++++++
6 files changed, 219 insertions(+), 15 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/formats/json.md
b/docs/content.zh/docs/connectors/table/formats/json.md
index 005485a7a0a..3869144949b 100644
--- a/docs/content.zh/docs/connectors/table/formats/json.md
+++ b/docs/content.zh/docs/connectors/table/formats/json.md
@@ -243,3 +243,34 @@ Format 参数
</tr>
</tbody>
</table>
+
+特性
+--------
+
+### 允许 json array 直接展开成多行数据
+
+通常,我们假设 JSON 的最外层数据是一个 JSON Object。所以一条 JSON 会转换成一行结果。
+
+但是在某些情况下 JSON 的最外层数据可能是一个 JSON Array,我们期望它可以被展开成多条结果。 JSON Array 的每个元素都是一个
JSON Object, 这些 JSON Object 的 schema 需要和 SQL 定义一致。然后每个 JSON Object
会被转成一行结果。Flink JSON Format 支持对这种情况的默认处理。
+
+例如,对于如下 DDL:
+```sql
+CREATE TABLE user_behavior (
+ col1 BIGINT,
+ col2 VARCHAR
+) WITH (
+ 'format' = 'json',
+ ...
+)
+```
+
+以下两种情况下 Flink JSON Format 都将会产生两条数据 `(123, "a")` 和 `(456, "b")`。
+最外层是一个 JSON Array:
+```json lines
+[{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}]
+```
+最外层是一个 JSON Object:
+```json lines
+{"col1": 123, "col2": "a"}
+{"col1": 456, "col2": "b"}
+```
diff --git a/docs/content/docs/connectors/table/formats/json.md
b/docs/content/docs/connectors/table/formats/json.md
index 64592ac28be..7ead1ab90d8 100644
--- a/docs/content/docs/connectors/table/formats/json.md
+++ b/docs/content/docs/connectors/table/formats/json.md
@@ -257,6 +257,35 @@ The following table lists the type mapping from Flink type
to JSON type.
</tbody>
</table>
+Features
+--------
+### Allow top-level JSON Arrays
+
+Usually, we assume the top-level of JSON string is a stringified JSON object.
Then this stringified JSON object can be converted into one SQL row.
+
+There are some cases that, the top-level of JSON string is a stringified JSON
array, and we want to explode the array into multiple records. Each element
within the array is a JSON object, the schema of every such JSON object is the
same as defined in SQL, and each of these JSON objects can be converted into
one row. Flink JSON Format supports reading such data.
+
+For example, for the following SQL DDL:
+```sql
+CREATE TABLE user_behavior (
+ col1 BIGINT,
+ col2 VARCHAR
+) WITH (
+ 'format' = 'json',
+ ...
+)
+```
+
+Flink JSON Format will produce 2 rows `(123, "a")` and `(456, "b")` with both
of following two JSON string.
+The top-level is JSON Array:
+```json lines
+[{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}]
+```
+The top-level is JSON Object:
+```json lines
+{"col1": 123, "col2": "a"}
+{"col1": 456, "col2": "b"}
+```
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
index aa62e0d5f87..88c5e9d4c1d 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
@@ -18,6 +18,7 @@
package org.apache.flink.formats.json;
+import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
@@ -25,12 +26,19 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -62,6 +70,10 @@ public abstract class AbstractJsonDeserializationSchema
implements Deserializati
private final boolean hasDecimalType;
+ private transient Collector<RowData> collector;
+
+ private transient List<RowData> reusableCollectList;
+
public AbstractJsonDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
@@ -89,6 +101,29 @@ public abstract class AbstractJsonDeserializationSchema
implements Deserializati
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
+ reusableCollectList = new ArrayList<>();
+ collector = new ListCollector<>(reusableCollectList);
+ }
+
+ /**
+ * @deprecated Use {@link DeserializationSchema#deserialize(byte[],
Collector)} instead. The
+ * implementation of {@link
AbstractJsonDeserializationSchema#deserialize(byte[])} will be
+ * removed in <a
href="https://issues.apache.org/jira/browse/FLINK-37707">FLINK-37707</a>.
+ */
+ @Deprecated
+ @Override
+ public RowData deserialize(@Nullable byte[] message) throws IOException {
+ reusableCollectList.clear();
+ deserialize(message, collector);
+ if (reusableCollectList.size() > 1) {
+ throw new FlinkRuntimeException(
+ "Please invoke "
+ + "DeserializationSchema#deserialize(byte[],
Collector<RowData>) instead.");
+ }
+ if (reusableCollectList.isEmpty()) {
+ return null;
+ }
+ return reusableCollectList.get(0);
}
@Override
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
index 22df48f2ac2..eea1f607f1c 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
@@ -72,10 +73,10 @@ public class JsonParserRowDataDeserializationSchema extends
AbstractJsonDeserial
}
@Override
- public RowData deserialize(byte[] message) throws IOException {
+ public void deserialize(byte[] message, Collector<RowData> out) throws
IOException {
// return null when there is no token
if (message == null || message.length == 0) {
- return null;
+ return;
}
try (JsonParser root =
objectMapper.getFactory().createParser(message)) {
/* First: must point to a token; if not pointing to one, advance.
@@ -85,16 +86,30 @@ public class JsonParserRowDataDeserializationSchema extends
AbstractJsonDeserial
if (root.currentToken() == null) {
root.nextToken();
}
- if (root.currentToken() != JsonToken.START_OBJECT) {
+ if (root.currentToken() != JsonToken.START_OBJECT
+ && root.currentToken() != JsonToken.START_ARRAY) {
throw JsonMappingException.from(root, "No content to map due
to end-of-input");
}
- return (RowData) runtimeConverter.convert(root);
+ if (root.currentToken() == JsonToken.START_ARRAY) {
+ processArray(root, out);
+ } else {
+ processObject(root, out);
+ }
} catch (Throwable t) {
- if (ignoreParseErrors) {
- return null;
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format("Failed to deserialize JSON '%s'.", new
String(message)), t);
}
- throw new IOException(
- format("Failed to deserialize JSON '%s'.", new
String(message)), t);
}
}
+
+ private void processArray(JsonParser root, Collector<RowData> out) throws
IOException {
+ while (root.nextToken() != JsonToken.END_ARRAY) {
+ out.collect((RowData) runtimeConverter.convert(root));
+ }
+ }
+
+ private void processObject(JsonParser root, Collector<RowData> out) throws
IOException {
+ out.collect((RowData) runtimeConverter.convert(root));
+ }
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 5a3fe22b308..e8f76eca52a 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -23,8 +23,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import javax.annotation.Nullable;
@@ -63,18 +65,38 @@ public class JsonRowDataDeserializationSchema extends
AbstractJsonDeserializatio
}
@Override
- public RowData deserialize(@Nullable byte[] message) throws IOException {
+ public void deserialize(@Nullable byte[] message, Collector<RowData> out)
throws IOException {
if (message == null) {
- return null;
+ return;
}
try {
- return convertToRowData(deserializeToJsonNode(message));
+ final JsonNode root = deserializeToJsonNode(message);
+ if (root != null && root.isArray()) {
+ ArrayNode arrayNode = (ArrayNode) root;
+ for (int i = 0; i < arrayNode.size(); i++) {
+ try {
+ RowData result = convertToRowData(arrayNode.get(i));
+ if (result != null) {
+ out.collect(result);
+ }
+ } catch (Throwable t) {
+ if (!ignoreParseErrors) {
+ // will be caught by outer try-catch
+ throw t;
+ }
+ }
+ }
+ } else {
+ RowData result = convertToRowData(root);
+ if (result != null) {
+ out.collect(result);
+ }
+ }
} catch (Throwable t) {
- if (ignoreParseErrors) {
- return null;
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format("Failed to deserialize JSON '%s'.", new
String(message)), t);
}
- throw new IOException(
- format("Failed to deserialize JSON '%s'.", new
String(message)), t);
}
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 916b04f50f8..94f59dbba36 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.formats.json;
+import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.core.testutils.FlinkAssertions;
@@ -34,6 +35,7 @@ import
org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -51,6 +53,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -224,6 +227,75 @@ public class JsonRowDataSerDeSchemaTest {
assertThat(serializedJson).containsExactly(actualBytes);
}
+ @Test
+ public void testEmptyJsonArrayDeserialization() throws Exception {
+ DataType dataType = ROW(FIELD("f1", INT()), FIELD("f2", BOOLEAN()),
FIELD("f3", STRING()));
+ RowType rowType = (RowType) dataType.getLogicalType();
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ ArrayNode arrayNode = objectMapper.createArrayNode();
+
+ DeserializationSchema<RowData> deserializationSchema =
+ createDeserializationSchema(
+ isJsonParser, rowType, false, false,
TimestampFormat.ISO_8601);
+
+ open(deserializationSchema);
+
+ List<RowData> result = new ArrayList<>();
+ Collector<RowData> collector = new ListCollector<>(result);
+
deserializationSchema.deserialize(objectMapper.writeValueAsBytes(arrayNode),
collector);
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ public void testJsonArrayToMultiRecords() throws Exception {
+ DataType dataType = ROW(FIELD("f1", INT()), FIELD("f2", BOOLEAN()),
FIELD("f3", STRING()));
+ RowType rowType = (RowType) dataType.getLogicalType();
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ ObjectNode element1 = objectMapper.createObjectNode();
+ element1.put("f1", 1);
+ element1.put("f2", true);
+ element1.put("f3", "str");
+
+ ObjectNode element2 = objectMapper.createObjectNode();
+ element2.put("f1", 10);
+ element2.put("f2", false);
+ element2.put("f3", "newStr");
+
+ ArrayNode arrayNode = objectMapper.createArrayNode();
+ arrayNode.add(element1);
+ arrayNode.add(element2);
+
+ DeserializationSchema<RowData> deserializationSchema =
+ createDeserializationSchema(
+ isJsonParser, rowType, false, false,
TimestampFormat.ISO_8601);
+
+ open(deserializationSchema);
+
+ // test serialization
+ JsonRowDataSerializationSchema serializationSchema =
+ new JsonRowDataSerializationSchema(
+ rowType,
+ TimestampFormat.ISO_8601,
+ JsonFormatOptions.MapNullKeyMode.LITERAL,
+ "null",
+ true,
+ false);
+ open(serializationSchema);
+
+ List<RowData> result = new ArrayList<>();
+ Collector<RowData> collector = new ListCollector<>(result);
+
deserializationSchema.deserialize(objectMapper.writeValueAsBytes(arrayNode),
collector);
+ assertThat(result).hasSize(2);
+
+ byte[] result1 = serializationSchema.serialize(result.get(0));
+ byte[] result2 = serializationSchema.serialize(result.get(1));
+
assertThat(result1).isEqualTo(objectMapper.writeValueAsBytes(element1));
+
assertThat(result2).isEqualTo(objectMapper.writeValueAsBytes(element2));
+ }
+
/**
* Tests the deserialization slow path, e.g. convert into string and use
{@link
* Double#parseDouble(String)}.