This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new b12f8ef [FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when
emitting `NULL`
b12f8ef is described below
commit b12f8efad939c1654142b893d8f630646956387e
Author: Marios Trivyzas <[email protected]>
AuthorDate: Sun Mar 27 11:29:12 2022 +0300
[FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when emitting `NULL`
Previously, when the Json aggregation is taking place, and id
JsonOnNull.NULL is selected, which means that we still want to emit
a `null` JSON node, .i.e `{.... "myField" : null ... }` when no values
get accumulated, we used a null `StringData` object. When
`state.backend.changelog.enabled` is enabled, the contents of the map
accumulating the aggregated records, gets serialized leading to NPE,
since `null` is not supported by `StringDataSerilizer`.
To solve this, we instead create a StringData with an empty `byte[]`,
which denotes the null, and when the aggregation ends and we create
the final JSON result, we check for a `byte[]` of `length` `0` in
order to write the JSON `null` node.
---
.../BuiltInAggregateFunctionTestBase.java | 16 +-
.../functions/JsonAggregationFunctionsITCase.java | 272 ++++++++++++---------
.../functions/aggregate/JsonObjectAggFunction.java | 7 +-
3 files changed, 169 insertions(+), 126 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
index ba2f01e..fc65b91 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
@@ -78,8 +78,7 @@ public class BuiltInAggregateFunctionTestBase {
public void testFunction() throws Exception {
final TableEnvironment tEnv =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
- // see https://issues.apache.org/jira/browse/FLINK-26092
- tEnv.getConfig().set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG,
false);
+ tEnv.getConfig().set(StateBackendOptions.STATE_BACKEND,
testSpec.backendName);
final Table sourceTable = asTable(tEnv, testSpec.sourceRowType,
testSpec.sourceRows);
for (final TestItem testItem : testSpec.testItems) {
@@ -175,6 +174,7 @@ public class BuiltInAggregateFunctionTestBase {
private DataType sourceRowType;
private List<Row> sourceRows;
+ private String backendName;
private TestSpec(BuiltInFunctionDefinition definition) {
this.definition = Preconditions.checkNotNull(definition);
@@ -195,6 +195,15 @@ public class BuiltInAggregateFunctionTestBase {
return this;
}
+ public TestSpec withStateBackend(String backendName) {
+ this.backendName = backendName;
+ return this;
+ }
+
+ public TestSpec copy() {
+ return new TestSpec(definition).withSource(sourceRowType, new
ArrayList<>(sourceRows));
+ }
+
TestSpec testSqlResult(
Function<Table, String> sqlSpec, DataType expectedRowType,
List<Row> expectedRows) {
this.testItems.add(new SqlTestItem(sqlSpec, expectedRowType,
expectedRows));
@@ -238,6 +247,7 @@ public class BuiltInAggregateFunctionTestBase {
bob.append(description);
bob.append(")");
}
+ bob.append(", backend: ").append(backendName);
return bob.toString();
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
index 9209a49..b0ffd6c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
@@ -27,7 +27,11 @@ import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import static
org.apache.flink.runtime.state.StateBackendLoader.HASHMAP_STATE_BACKEND_NAME;
+import static
org.apache.flink.runtime.state.StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
@@ -43,128 +47,154 @@ public class JsonAggregationFunctionsITCase extends
BuiltInAggregateFunctionTest
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() throws Exception {
- return Arrays.asList(
- // JSON_OBJECTAGG
-
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
- .withDescription("Basic Aggregation")
- .withSource(
- ROW(STRING(), INT()),
- Arrays.asList(
- Row.ofKind(INSERT, "A", 1),
- Row.ofKind(INSERT, "B", null),
- Row.ofKind(INSERT, "C", 3)))
- .testResult(
- source -> "SELECT JSON_OBJECTAGG(f0 VALUE f1)
FROM " + source,
- source ->
- source.select(
- jsonObjectAgg(JsonOnNull.NULL,
$("f0"), $("f1"))),
- ROW(VARCHAR(2000).notNull()),
- ROW(STRING().notNull()),
-
Collections.singletonList(Row.of("{\"A\":1,\"B\":null,\"C\":3}"))),
-
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL)
- .withDescription("Omits NULLs")
- .withSource(
- ROW(STRING(), INT()),
- Arrays.asList(
- Row.ofKind(INSERT, "A", 1),
- Row.ofKind(INSERT, "B", null),
- Row.ofKind(INSERT, "C", 3)))
- .testResult(
- source ->
- "SELECT JSON_OBJECTAGG(f0 VALUE f1
ABSENT ON NULL) FROM "
- + source,
- source ->
- source.select(
-
jsonObjectAgg(JsonOnNull.ABSENT, $("f0"), $("f1"))),
- ROW(VARCHAR(2000).notNull()),
- ROW(STRING().notNull()),
-
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
-
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
- .withDescription("Retractions")
- .withSource(
- ROW(STRING(), INT()),
- Arrays.asList(
- Row.ofKind(INSERT, "A", 1),
- Row.ofKind(INSERT, "B", 2),
- Row.ofKind(INSERT, "C", 3),
- Row.ofKind(DELETE, "B", 2)))
- .testResult(
- source -> "SELECT JSON_OBJECTAGG(f0 VALUE f1)
FROM " + source,
- source ->
- source.select(
- jsonObjectAgg(JsonOnNull.NULL,
$("f0"), $("f1"))),
- ROW(VARCHAR(2000).notNull()),
- ROW(STRING().notNull()),
-
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
-
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
- .withDescription("Group Aggregation")
- .withSource(
- ROW(INT(), STRING(), INT()),
- Arrays.asList(
- Row.ofKind(INSERT, 1, "A", 0),
- Row.ofKind(INSERT, 1, "B", 0),
- Row.ofKind(INSERT, 2, "A", 0),
- Row.ofKind(INSERT, 2, "C", 0)))
- .testResult(
- source ->
- "SELECT f0, JSON_OBJECTAGG(f1 VALUE
f2) FROM "
- + source
- + " GROUP BY f0",
- source ->
- source.groupBy($("f0"))
- .select(
- $("f0"),
+ return Stream.of(
+ // JSON_OBJECTAGG
+
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+ .withDescription("Basic Aggregation")
+ .withSource(
+ ROW(STRING(), INT()),
+ Arrays.asList(
+ Row.ofKind(INSERT, "A", 1),
+ Row.ofKind(INSERT, "B", null),
+ Row.ofKind(INSERT, "C", 3)))
+ .testResult(
+ source ->
+ "SELECT JSON_OBJECTAGG(f0
VALUE f1) FROM " + source,
+ source ->
+ source.select(
jsonObjectAgg(
-
JsonOnNull.NULL, $("f1"), $("f2"))),
- ROW(INT(), VARCHAR(2000).notNull()),
- ROW(INT(), STRING().notNull()),
- Arrays.asList(
- Row.of(1, "{\"A\":0,\"B\":0}"),
- Row.of(2, "{\"A\":0,\"C\":0}"))),
+
JsonOnNull.NULL, $("f0"), $("f1"))),
+ ROW(VARCHAR(2000).notNull()),
+ ROW(STRING().notNull()),
+ Collections.singletonList(
+
Row.of("{\"A\":1,\"B\":null,\"C\":3}"))),
+ TestSpec.forFunction(
+
BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL)
+ .withDescription("Omits NULLs")
+ .withSource(
+ ROW(STRING(), INT()),
+ Arrays.asList(
+ Row.ofKind(INSERT, "A", 1),
+ Row.ofKind(INSERT, "B", null),
+ Row.ofKind(INSERT, "C", 3)))
+ .testResult(
+ source ->
+ "SELECT JSON_OBJECTAGG(f0
VALUE f1 ABSENT ON NULL) FROM "
+ + source,
+ source ->
+ source.select(
+ jsonObjectAgg(
+
JsonOnNull.ABSENT,
+ $("f0"),
+ $("f1"))),
+ ROW(VARCHAR(2000).notNull()),
+ ROW(STRING().notNull()),
+
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
+
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+ .withDescription("Retractions")
+ .withSource(
+ ROW(STRING(), INT()),
+ Arrays.asList(
+ Row.ofKind(INSERT, "A", 1),
+ Row.ofKind(INSERT, "B", 2),
+ Row.ofKind(INSERT, "C", 3),
+ Row.ofKind(DELETE, "B", 2)))
+ .testResult(
+ source ->
+ "SELECT JSON_OBJECTAGG(f0
VALUE f1) FROM " + source,
+ source ->
+ source.select(
+ jsonObjectAgg(
+
JsonOnNull.NULL, $("f0"), $("f1"))),
+ ROW(VARCHAR(2000).notNull()),
+ ROW(STRING().notNull()),
+
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
+
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+ .withDescription("Group Aggregation")
+ .withSource(
+ ROW(INT(), STRING(), INT()),
+ Arrays.asList(
+ Row.ofKind(INSERT, 1, "A", 0),
+ Row.ofKind(INSERT, 1, "B", 0),
+ Row.ofKind(INSERT, 2, "A", 0),
+ Row.ofKind(INSERT, 2, "C", 0)))
+ .testResult(
+ source ->
+ "SELECT f0, JSON_OBJECTAGG(f1
VALUE f2) FROM "
+ + source
+ + " GROUP BY f0",
+ source ->
+ source.groupBy($("f0"))
+ .select(
+ $("f0"),
+ jsonObjectAgg(
+
JsonOnNull.NULL,
+
$("f1"),
+
$("f2"))),
+ ROW(INT(), VARCHAR(2000).notNull()),
+ ROW(INT(), STRING().notNull()),
+ Arrays.asList(
+ Row.of(1, "{\"A\":0,\"B\":0}"),
+ Row.of(2,
"{\"A\":0,\"C\":0}"))),
- // JSON_ARRAYAGG
-
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
- .withDescription("Basic Aggregation")
- .withSource(
- ROW(STRING()),
- Arrays.asList(
- Row.ofKind(INSERT, "A"),
- Row.ofKind(INSERT, (String) null),
- Row.ofKind(INSERT, "C")))
- .testResult(
- source -> "SELECT JSON_ARRAYAGG(f0) FROM " +
source,
- source ->
source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
- ROW(VARCHAR(2000).notNull()),
- ROW(STRING().notNull()),
-
Collections.singletonList(Row.of("[\"A\",\"C\"]"))),
-
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL)
- .withDescription("Keeps NULLs")
- .withSource(
- ROW(STRING()),
- Arrays.asList(
- Row.ofKind(INSERT, "A"),
- Row.ofKind(INSERT, (String) null),
- Row.ofKind(INSERT, "C")))
- .testResult(
- source -> "SELECT JSON_ARRAYAGG(f0 NULL ON
NULL) FROM " + source,
- source ->
source.select(jsonArrayAgg(JsonOnNull.NULL, $("f0"))),
- ROW(VARCHAR(2000).notNull()),
- ROW(STRING().notNull()),
-
Collections.singletonList(Row.of("[\"A\",null,\"C\"]"))),
-
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
- .withDescription("Retractions")
- .withSource(
- ROW(INT()),
- Arrays.asList(
- Row.ofKind(INSERT, 1),
- Row.ofKind(INSERT, 2),
- Row.ofKind(INSERT, 3),
- Row.ofKind(DELETE, 2)))
- .testResult(
- source -> "SELECT JSON_ARRAYAGG(f0) FROM " +
source,
- source ->
source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
- ROW(VARCHAR(2000).notNull()),
- ROW(STRING().notNull()),
- Collections.singletonList(Row.of("[1,3]"))));
+ // JSON_ARRAYAGG
+ TestSpec.forFunction(
+
BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+ .withDescription("Basic Aggregation")
+ .withSource(
+ ROW(STRING()),
+ Arrays.asList(
+ Row.ofKind(INSERT, "A"),
+ Row.ofKind(INSERT, (String)
null),
+ Row.ofKind(INSERT, "C")))
+ .testResult(
+ source -> "SELECT JSON_ARRAYAGG(f0)
FROM " + source,
+ source ->
+ source.select(
+
jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+ ROW(VARCHAR(2000).notNull()),
+ ROW(STRING().notNull()),
+
Collections.singletonList(Row.of("[\"A\",\"C\"]"))),
+
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL)
+ .withDescription("Keeps NULLs")
+ .withSource(
+ ROW(STRING()),
+ Arrays.asList(
+ Row.ofKind(INSERT, "A"),
+ Row.ofKind(INSERT, (String)
null),
+ Row.ofKind(INSERT, "C")))
+ .testResult(
+ source ->
+ "SELECT JSON_ARRAYAGG(f0 NULL
ON NULL) FROM "
+ + source,
+ source ->
+ source.select(
+
jsonArrayAgg(JsonOnNull.NULL, $("f0"))),
+ ROW(VARCHAR(2000).notNull()),
+ ROW(STRING().notNull()),
+
Collections.singletonList(Row.of("[\"A\",null,\"C\"]"))),
+ TestSpec.forFunction(
+
BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+ .withDescription("Retractions")
+ .withSource(
+ ROW(INT()),
+ Arrays.asList(
+ Row.ofKind(INSERT, 1),
+ Row.ofKind(INSERT, 2),
+ Row.ofKind(INSERT, 3),
+ Row.ofKind(DELETE, 2)))
+ .testResult(
+ source -> "SELECT JSON_ARRAYAGG(f0)
FROM " + source,
+ source ->
+ source.select(
+
jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+ ROW(VARCHAR(2000).notNull()),
+ ROW(STRING().notNull()),
+
Collections.singletonList(Row.of("[1,3]"))))
+ .flatMap(
+ spec ->
+ Stream.of(HASHMAP_STATE_BACKEND_NAME,
ROCKSDB_STATE_BACKEND_NAME)
+ .map(backend ->
spec.copy().withStateBackend(backend)))
+ .collect(Collectors.toList());
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
index 8152f98..4be69c9 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
@@ -57,6 +57,7 @@ public class JsonObjectAggFunction
extends BuiltInAggregateFunction<String,
JsonObjectAggFunction.Accumulator> {
private static final long serialVersionUID = 1L;
+ private static final StringData NULL_STRING_DATA =
StringData.fromBytes(new byte[] {});
private static final NullNode NULL_NODE = getNodeFactory().nullNode();
private final transient List<DataType> argumentTypes;
@@ -107,7 +108,9 @@ public class JsonObjectAggFunction
if (valueData == null) {
if (!skipNulls) {
- acc.map.put(keyData, null);
+ // We cannot use null for StringData here, since it's not
supported by the
+ // StringDataSerializer, instead use a StringData with an
empty byte[]
+ acc.map.put(keyData, NULL_STRING_DATA);
}
} else {
acc.map.put(keyData, valueData);
@@ -135,7 +138,7 @@ public class JsonObjectAggFunction
for (final StringData key : acc.map.keys()) {
final StringData value = acc.map.get(key);
final JsonNode valueNode =
- value == null
+ value.toBytes().length == 0
? NULL_NODE
: getNodeFactory().rawValueNode(new
RawValue(value.toString()));