This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new b12f8ef  [FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when 
emitting `NULL`
b12f8ef is described below

commit b12f8efad939c1654142b893d8f630646956387e
Author: Marios Trivyzas <[email protected]>
AuthorDate: Sun Mar 27 11:29:12 2022 +0300

    [FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when emitting `NULL`
    
    Previously, when the Json aggregation is taking place, and id
    JsonOnNull.NULL is selected, which means that we still want to emit
    a `null` JSON node, .i.e `{.... "myField" : null ... }` when no values
    get accumulated, we used a null `StringData` object. When
    `state.backend.changelog.enabled` is enabled, the contents of the map
    accumulating the aggregated records, gets serialized leading to NPE,
    since `null` is not supported by `StringDataSerilizer`.
    
    To solve this, we instead create a StringData with an empty `byte[]`,
    which denotes the null, and when the aggregation ends and we create
    the final JSON result, we check for a `byte[]` of `length` `0` in
    order to write the JSON `null` node.
---
 .../BuiltInAggregateFunctionTestBase.java          |  16 +-
 .../functions/JsonAggregationFunctionsITCase.java  | 272 ++++++++++++---------
 .../functions/aggregate/JsonObjectAggFunction.java |   7 +-
 3 files changed, 169 insertions(+), 126 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
index ba2f01e..fc65b91 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.functions;
 
-import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -78,8 +78,7 @@ public class BuiltInAggregateFunctionTestBase {
     public void testFunction() throws Exception {
         final TableEnvironment tEnv =
                 TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-        // see https://issues.apache.org/jira/browse/FLINK-26092
-        tEnv.getConfig().set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, 
false);
+        tEnv.getConfig().set(StateBackendOptions.STATE_BACKEND, 
testSpec.backendName);
         final Table sourceTable = asTable(tEnv, testSpec.sourceRowType, 
testSpec.sourceRows);
 
         for (final TestItem testItem : testSpec.testItems) {
@@ -175,6 +174,7 @@ public class BuiltInAggregateFunctionTestBase {
 
         private DataType sourceRowType;
         private List<Row> sourceRows;
+        private String backendName;
 
         private TestSpec(BuiltInFunctionDefinition definition) {
             this.definition = Preconditions.checkNotNull(definition);
@@ -195,6 +195,15 @@ public class BuiltInAggregateFunctionTestBase {
             return this;
         }
 
+        public TestSpec withStateBackend(String backendName) {
+            this.backendName = backendName;
+            return this;
+        }
+
+        public TestSpec copy() {
+            return new TestSpec(definition).withSource(sourceRowType, new 
ArrayList<>(sourceRows));
+        }
+
         TestSpec testSqlResult(
                 Function<Table, String> sqlSpec, DataType expectedRowType, 
List<Row> expectedRows) {
             this.testItems.add(new SqlTestItem(sqlSpec, expectedRowType, 
expectedRows));
@@ -238,6 +247,7 @@ public class BuiltInAggregateFunctionTestBase {
                 bob.append(description);
                 bob.append(")");
             }
+            bob.append(", backend: ").append(backendName);
 
             return bob.toString();
         }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
index 9209a49..b0ffd6c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
@@ -27,7 +27,11 @@ import org.junit.runners.Parameterized;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static 
org.apache.flink.runtime.state.StateBackendLoader.HASHMAP_STATE_BACKEND_NAME;
+import static 
org.apache.flink.runtime.state.StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME;
 import static org.apache.flink.table.api.DataTypes.INT;
 import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
@@ -43,128 +47,154 @@ public class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTest
 
     @Parameterized.Parameters(name = "{index}: {0}")
     public static List<TestSpec> testData() throws Exception {
-        return Arrays.asList(
-                // JSON_OBJECTAGG
-                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
-                        .withDescription("Basic Aggregation")
-                        .withSource(
-                                ROW(STRING(), INT()),
-                                Arrays.asList(
-                                        Row.ofKind(INSERT, "A", 1),
-                                        Row.ofKind(INSERT, "B", null),
-                                        Row.ofKind(INSERT, "C", 3)))
-                        .testResult(
-                                source -> "SELECT JSON_OBJECTAGG(f0 VALUE f1) 
FROM " + source,
-                                source ->
-                                        source.select(
-                                                jsonObjectAgg(JsonOnNull.NULL, 
$("f0"), $("f1"))),
-                                ROW(VARCHAR(2000).notNull()),
-                                ROW(STRING().notNull()),
-                                
Collections.singletonList(Row.of("{\"A\":1,\"B\":null,\"C\":3}"))),
-                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL)
-                        .withDescription("Omits NULLs")
-                        .withSource(
-                                ROW(STRING(), INT()),
-                                Arrays.asList(
-                                        Row.ofKind(INSERT, "A", 1),
-                                        Row.ofKind(INSERT, "B", null),
-                                        Row.ofKind(INSERT, "C", 3)))
-                        .testResult(
-                                source ->
-                                        "SELECT JSON_OBJECTAGG(f0 VALUE f1 
ABSENT ON NULL) FROM "
-                                                + source,
-                                source ->
-                                        source.select(
-                                                
jsonObjectAgg(JsonOnNull.ABSENT, $("f0"), $("f1"))),
-                                ROW(VARCHAR(2000).notNull()),
-                                ROW(STRING().notNull()),
-                                
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
-                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
-                        .withDescription("Retractions")
-                        .withSource(
-                                ROW(STRING(), INT()),
-                                Arrays.asList(
-                                        Row.ofKind(INSERT, "A", 1),
-                                        Row.ofKind(INSERT, "B", 2),
-                                        Row.ofKind(INSERT, "C", 3),
-                                        Row.ofKind(DELETE, "B", 2)))
-                        .testResult(
-                                source -> "SELECT JSON_OBJECTAGG(f0 VALUE f1) 
FROM " + source,
-                                source ->
-                                        source.select(
-                                                jsonObjectAgg(JsonOnNull.NULL, 
$("f0"), $("f1"))),
-                                ROW(VARCHAR(2000).notNull()),
-                                ROW(STRING().notNull()),
-                                
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
-                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
-                        .withDescription("Group Aggregation")
-                        .withSource(
-                                ROW(INT(), STRING(), INT()),
-                                Arrays.asList(
-                                        Row.ofKind(INSERT, 1, "A", 0),
-                                        Row.ofKind(INSERT, 1, "B", 0),
-                                        Row.ofKind(INSERT, 2, "A", 0),
-                                        Row.ofKind(INSERT, 2, "C", 0)))
-                        .testResult(
-                                source ->
-                                        "SELECT f0, JSON_OBJECTAGG(f1 VALUE 
f2) FROM "
-                                                + source
-                                                + " GROUP BY f0",
-                                source ->
-                                        source.groupBy($("f0"))
-                                                .select(
-                                                        $("f0"),
+        return Stream.of(
+                        // JSON_OBJECTAGG
+                        
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+                                .withDescription("Basic Aggregation")
+                                .withSource(
+                                        ROW(STRING(), INT()),
+                                        Arrays.asList(
+                                                Row.ofKind(INSERT, "A", 1),
+                                                Row.ofKind(INSERT, "B", null),
+                                                Row.ofKind(INSERT, "C", 3)))
+                                .testResult(
+                                        source ->
+                                                "SELECT JSON_OBJECTAGG(f0 
VALUE f1) FROM " + source,
+                                        source ->
+                                                source.select(
                                                         jsonObjectAgg(
-                                                                
JsonOnNull.NULL, $("f1"), $("f2"))),
-                                ROW(INT(), VARCHAR(2000).notNull()),
-                                ROW(INT(), STRING().notNull()),
-                                Arrays.asList(
-                                        Row.of(1, "{\"A\":0,\"B\":0}"),
-                                        Row.of(2, "{\"A\":0,\"C\":0}"))),
+                                                                
JsonOnNull.NULL, $("f0"), $("f1"))),
+                                        ROW(VARCHAR(2000).notNull()),
+                                        ROW(STRING().notNull()),
+                                        Collections.singletonList(
+                                                
Row.of("{\"A\":1,\"B\":null,\"C\":3}"))),
+                        TestSpec.forFunction(
+                                        
BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL)
+                                .withDescription("Omits NULLs")
+                                .withSource(
+                                        ROW(STRING(), INT()),
+                                        Arrays.asList(
+                                                Row.ofKind(INSERT, "A", 1),
+                                                Row.ofKind(INSERT, "B", null),
+                                                Row.ofKind(INSERT, "C", 3)))
+                                .testResult(
+                                        source ->
+                                                "SELECT JSON_OBJECTAGG(f0 
VALUE f1 ABSENT ON NULL) FROM "
+                                                        + source,
+                                        source ->
+                                                source.select(
+                                                        jsonObjectAgg(
+                                                                
JsonOnNull.ABSENT,
+                                                                $("f0"),
+                                                                $("f1"))),
+                                        ROW(VARCHAR(2000).notNull()),
+                                        ROW(STRING().notNull()),
+                                        
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
+                        
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+                                .withDescription("Retractions")
+                                .withSource(
+                                        ROW(STRING(), INT()),
+                                        Arrays.asList(
+                                                Row.ofKind(INSERT, "A", 1),
+                                                Row.ofKind(INSERT, "B", 2),
+                                                Row.ofKind(INSERT, "C", 3),
+                                                Row.ofKind(DELETE, "B", 2)))
+                                .testResult(
+                                        source ->
+                                                "SELECT JSON_OBJECTAGG(f0 
VALUE f1) FROM " + source,
+                                        source ->
+                                                source.select(
+                                                        jsonObjectAgg(
+                                                                
JsonOnNull.NULL, $("f0"), $("f1"))),
+                                        ROW(VARCHAR(2000).notNull()),
+                                        ROW(STRING().notNull()),
+                                        
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
+                        
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+                                .withDescription("Group Aggregation")
+                                .withSource(
+                                        ROW(INT(), STRING(), INT()),
+                                        Arrays.asList(
+                                                Row.ofKind(INSERT, 1, "A", 0),
+                                                Row.ofKind(INSERT, 1, "B", 0),
+                                                Row.ofKind(INSERT, 2, "A", 0),
+                                                Row.ofKind(INSERT, 2, "C", 0)))
+                                .testResult(
+                                        source ->
+                                                "SELECT f0, JSON_OBJECTAGG(f1 
VALUE f2) FROM "
+                                                        + source
+                                                        + " GROUP BY f0",
+                                        source ->
+                                                source.groupBy($("f0"))
+                                                        .select(
+                                                                $("f0"),
+                                                                jsonObjectAgg(
+                                                                        
JsonOnNull.NULL,
+                                                                        
$("f1"),
+                                                                        
$("f2"))),
+                                        ROW(INT(), VARCHAR(2000).notNull()),
+                                        ROW(INT(), STRING().notNull()),
+                                        Arrays.asList(
+                                                Row.of(1, "{\"A\":0,\"B\":0}"),
+                                                Row.of(2, 
"{\"A\":0,\"C\":0}"))),
 
-                // JSON_ARRAYAGG
-                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
-                        .withDescription("Basic Aggregation")
-                        .withSource(
-                                ROW(STRING()),
-                                Arrays.asList(
-                                        Row.ofKind(INSERT, "A"),
-                                        Row.ofKind(INSERT, (String) null),
-                                        Row.ofKind(INSERT, "C")))
-                        .testResult(
-                                source -> "SELECT JSON_ARRAYAGG(f0) FROM " + 
source,
-                                source -> 
source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
-                                ROW(VARCHAR(2000).notNull()),
-                                ROW(STRING().notNull()),
-                                
Collections.singletonList(Row.of("[\"A\",\"C\"]"))),
-                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL)
-                        .withDescription("Keeps NULLs")
-                        .withSource(
-                                ROW(STRING()),
-                                Arrays.asList(
-                                        Row.ofKind(INSERT, "A"),
-                                        Row.ofKind(INSERT, (String) null),
-                                        Row.ofKind(INSERT, "C")))
-                        .testResult(
-                                source -> "SELECT JSON_ARRAYAGG(f0 NULL ON 
NULL) FROM " + source,
-                                source -> 
source.select(jsonArrayAgg(JsonOnNull.NULL, $("f0"))),
-                                ROW(VARCHAR(2000).notNull()),
-                                ROW(STRING().notNull()),
-                                
Collections.singletonList(Row.of("[\"A\",null,\"C\"]"))),
-                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
-                        .withDescription("Retractions")
-                        .withSource(
-                                ROW(INT()),
-                                Arrays.asList(
-                                        Row.ofKind(INSERT, 1),
-                                        Row.ofKind(INSERT, 2),
-                                        Row.ofKind(INSERT, 3),
-                                        Row.ofKind(DELETE, 2)))
-                        .testResult(
-                                source -> "SELECT JSON_ARRAYAGG(f0) FROM " + 
source,
-                                source -> 
source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
-                                ROW(VARCHAR(2000).notNull()),
-                                ROW(STRING().notNull()),
-                                Collections.singletonList(Row.of("[1,3]"))));
+                        // JSON_ARRAYAGG
+                        TestSpec.forFunction(
+                                        
BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+                                .withDescription("Basic Aggregation")
+                                .withSource(
+                                        ROW(STRING()),
+                                        Arrays.asList(
+                                                Row.ofKind(INSERT, "A"),
+                                                Row.ofKind(INSERT, (String) 
null),
+                                                Row.ofKind(INSERT, "C")))
+                                .testResult(
+                                        source -> "SELECT JSON_ARRAYAGG(f0) 
FROM " + source,
+                                        source ->
+                                                source.select(
+                                                        
jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+                                        ROW(VARCHAR(2000).notNull()),
+                                        ROW(STRING().notNull()),
+                                        
Collections.singletonList(Row.of("[\"A\",\"C\"]"))),
+                        
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL)
+                                .withDescription("Keeps NULLs")
+                                .withSource(
+                                        ROW(STRING()),
+                                        Arrays.asList(
+                                                Row.ofKind(INSERT, "A"),
+                                                Row.ofKind(INSERT, (String) 
null),
+                                                Row.ofKind(INSERT, "C")))
+                                .testResult(
+                                        source ->
+                                                "SELECT JSON_ARRAYAGG(f0 NULL 
ON NULL) FROM "
+                                                        + source,
+                                        source ->
+                                                source.select(
+                                                        
jsonArrayAgg(JsonOnNull.NULL, $("f0"))),
+                                        ROW(VARCHAR(2000).notNull()),
+                                        ROW(STRING().notNull()),
+                                        
Collections.singletonList(Row.of("[\"A\",null,\"C\"]"))),
+                        TestSpec.forFunction(
+                                        
BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+                                .withDescription("Retractions")
+                                .withSource(
+                                        ROW(INT()),
+                                        Arrays.asList(
+                                                Row.ofKind(INSERT, 1),
+                                                Row.ofKind(INSERT, 2),
+                                                Row.ofKind(INSERT, 3),
+                                                Row.ofKind(DELETE, 2)))
+                                .testResult(
+                                        source -> "SELECT JSON_ARRAYAGG(f0) 
FROM " + source,
+                                        source ->
+                                                source.select(
+                                                        
jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+                                        ROW(VARCHAR(2000).notNull()),
+                                        ROW(STRING().notNull()),
+                                        
Collections.singletonList(Row.of("[1,3]"))))
+                .flatMap(
+                        spec ->
+                                Stream.of(HASHMAP_STATE_BACKEND_NAME, 
ROCKSDB_STATE_BACKEND_NAME)
+                                        .map(backend -> 
spec.copy().withStateBackend(backend)))
+                .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
index 8152f98..4be69c9 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
@@ -57,6 +57,7 @@ public class JsonObjectAggFunction
         extends BuiltInAggregateFunction<String, 
JsonObjectAggFunction.Accumulator> {
 
     private static final long serialVersionUID = 1L;
+    private static final StringData NULL_STRING_DATA = 
StringData.fromBytes(new byte[] {});
     private static final NullNode NULL_NODE = getNodeFactory().nullNode();
 
     private final transient List<DataType> argumentTypes;
@@ -107,7 +108,9 @@ public class JsonObjectAggFunction
 
         if (valueData == null) {
             if (!skipNulls) {
-                acc.map.put(keyData, null);
+                // We cannot use null for StringData here, since it's not 
supported by the
+                // StringDataSerializer, instead use a StringData with an 
empty byte[]
+                acc.map.put(keyData, NULL_STRING_DATA);
             }
         } else {
             acc.map.put(keyData, valueData);
@@ -135,7 +138,7 @@ public class JsonObjectAggFunction
             for (final StringData key : acc.map.keys()) {
                 final StringData value = acc.map.get(key);
                 final JsonNode valueNode =
-                        value == null
+                        value.toBytes().length == 0
                                 ? NULL_NODE
                                 : getNodeFactory().rawValueNode(new 
RawValue(value.toString()));
 

Reply via email to