This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 954d32ffa59 [FLINK-39638][table] Make codegen reuse result of JSON
parse in `JSON_VALUE`, `JSON_QUERY`
954d32ffa59 is described below
commit 954d32ffa59edb96f38fce283dde4977adb68946
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon May 11 12:35:05 2026 +0200
[FLINK-39638][table] Make codegen reuse result of JSON parse in
`JSON_VALUE`, `JSON_QUERY`
---
.../planner/codegen/calls/BuiltInMethods.scala | 16 +-
.../planner/codegen/calls/JsonQueryCallGen.scala | 42 +++++-
.../planner/codegen/calls/JsonValueCallGen.scala | 42 +++++-
.../table/planner/codegen/JsonParseReuseTest.java | 168 +++++++++++++++++++++
.../planner/functions/JsonFunctionsITCase.java | 8 +-
.../table/runtime/functions/SqlJsonUtils.java | 43 +++++-
6 files changed, 312 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 20af0357350..c13f9f21771 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -482,7 +482,7 @@ object BuiltInMethods {
val JSON_VALUE = Types.lookupMethod(
classOf[SqlJsonUtils],
"jsonValue",
- classOf[String],
+ classOf[SqlJsonUtils.JsonValueContext],
classOf[String],
classOf[JsonValueOnEmptyOrError],
classOf[Any],
@@ -501,6 +501,20 @@ object BuiltInMethods {
classOf[JsonQueryOnEmptyOrError]
)
+ val JSON_PARSE =
+ Types.lookupMethod(classOf[SqlJsonUtils], "jsonParse", classOf[String])
+
+ val JSON_QUERY_PARSED = Types.lookupMethod(
+ classOf[SqlJsonUtils],
+ "jsonQueryParsed",
+ classOf[SqlJsonUtils.JsonValueContext],
+ classOf[String],
+ classOf[JsonQueryReturnType],
+ classOf[JsonQueryWrapper],
+ classOf[JsonQueryOnEmptyOrError],
+ classOf[JsonQueryOnEmptyOrError]
+ )
+
val IS_JSON_VALUE = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonValue",
classOf[String])
val IS_JSON_OBJECT = Types.lookupMethod(classOf[SqlJsonUtils],
"isJsonObject", classOf[String])
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonQueryCallGen.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonQueryCallGen.scala
index 30ebc63d24d..2b386ac1fb7 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonQueryCallGen.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonQueryCallGen.scala
@@ -21,6 +21,7 @@ import org.apache.flink.table.api.{JsonQueryOnEmptyOrError,
JsonQueryWrapper, Js
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
CodeGenException, CodeGenUtils, GeneratedExpression}
import org.apache.flink.table.planner.codegen.CodeGenUtils.{qualifyEnum,
qualifyMethod, BINARY_STRING, GENERIC_ARRAY}
import
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallWithStmtIfArgsNotNull
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
import
org.apache.flink.table.runtime.functions.SqlJsonUtils.JsonQueryReturnType
import org.apache.flink.table.types.logical.{ArrayType, LogicalType,
LogicalTypeRoot}
@@ -32,6 +33,23 @@ import org.apache.calcite.sql.SqlJsonEmptyOrError
* We cannot use [[MethodCallGen]] for a few different reasons. First, the
return type of the
* built-in Calcite function is [[Object]] and needs to be cast based on the
inferred return type
* instead as users can change this using the RETURNING keyword.
+ *
+ * When multiple JSON function calls share the same input expression, the
parsed JSON context is
+ * reused via a shared member variable. For example, a query like:
+ * {{{
+ * SELECT JSON_VALUE(json_data, '$.type'), JSON_QUERY(json_data, '$.address')
FROM t
+ * }}}
+ * generates code similar to:
+ * {{{
+ * // member variable (declared once)
+ * SqlJsonUtils.JsonValueContext jsonParsed$0;
+ *
+ * // in processElement (parse emitted only by the first function)
+ * jsonParsed$0 = SqlJsonUtils.jsonParse(field$0.toString());
+ * Object rawResult$1 = SqlJsonUtils.jsonValue(jsonParsed$0, "$.type", ...);
+ * // second call reuses jsonParsed$123 without re-parsing
+ * Object rawResult$2 = SqlJsonUtils.jsonQuery(jsonParsed$0, "$.address", ...);
+ * }}}
*/
class JsonQueryCallGen extends CallGenerator {
override def generate(
@@ -50,8 +68,26 @@ class JsonQueryCallGen extends CallGenerator {
} else {
JsonQueryReturnType.STRING
}
+ val inputTerm = s"${argTerms.head}.toString()"
+
+ val (varName, parseCode) =
+ ctx.getReusableInputUnboxingExprs(inputTerm, Int.MinValue) match {
+ case Some(expr) => (expr.resultTerm, "")
+ case None =>
+ val newVarName = CodeGenUtils.newName(ctx, "jsonParsed")
+ val typeName = classOf[SqlJsonUtils.JsonValueContext].getName
+ ctx.addReusableMember(s"$typeName $newVarName;")
+ ctx.addReusableInputUnboxingExprs(
+ inputTerm,
+ Int.MinValue,
+ GeneratedExpression(newVarName, "false", "", null))
+ val assign =
+ s"$newVarName =
${qualifyMethod(BuiltInMethods.JSON_PARSE)}($inputTerm);"
+ (newVarName, assign)
+ }
+
val terms = Seq(
- s"${argTerms.head}.toString()",
+ varName,
s"${argTerms(1)}.toString()",
qualifyEnum(jsonQueryReturnType),
qualifyEnum(wrapperBehavior),
@@ -61,8 +97,10 @@ class JsonQueryCallGen extends CallGenerator {
val rawResultTerm = CodeGenUtils.newName(ctx, "rawResult")
val call = s"""
+ |$parseCode
|Object $rawResultTerm =
- |
${qualifyMethod(BuiltInMethods.JSON_QUERY)}(${terms.mkString(", ")});
+ |
${qualifyMethod(BuiltInMethods.JSON_QUERY_PARSED)}(${terms
+ .mkString(", ")});
""".stripMargin
val convertedResult = returnType.getTypeRoot match {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
index c6783d8ccc2..f28b60e85e1 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
@@ -21,6 +21,7 @@ import org.apache.flink.table.api.JsonValueOnEmptyOrError
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
CodeGenException, CodeGenUtils, GeneratedExpression}
import org.apache.flink.table.planner.codegen.CodeGenUtils.{qualifyEnum,
qualifyMethod, BINARY_STRING}
import
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallWithStmtIfArgsNotNull
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
import org.apache.calcite.sql.SqlJsonEmptyOrError
@@ -32,6 +33,23 @@ import org.apache.calcite.sql.SqlJsonEmptyOrError
* built-in Calcite function is [[Object]] and needs to be cast based on the
inferred return type
* instead as users can change this using the RETURNING keyword. Furthermore,
we need to provide the
* proper default values in case not all arguments were given.
+ *
+ * When multiple JSON function calls share the same input expression, the
parsed JSON context is
+ * reused via a shared member variable. For example, a query like:
+ * {{{
+ * SELECT JSON_VALUE(json_data, '$.type'), JSON_VALUE(json_data, '$.age') FROM
t
+ * }}}
+ * generates code similar to:
+ * {{{
+ * // member variable (declared once)
+ * SqlJsonUtils.JsonValueContext jsonParsed$0;
+ *
+ * // in processElement (parse emitted only by the first function)
+ * jsonParsed$0 = SqlJsonUtils.jsonParse(field$0.toString());
+ * Object rawResult$1 = SqlJsonUtils.jsonValue(jsonParsed$0, "$.type", ...);
+ * // second call reuses jsonParsed$0 without re-parsing
+ * Object rawResult$2 = SqlJsonUtils.jsonValue(jsonParsed$0, "$.age", ...);
+ * }}}
*/
class JsonValueCallGen extends CallGenerator {
override def generate(
@@ -44,8 +62,26 @@ class JsonValueCallGen extends CallGenerator {
{
val emptyBehavior = getBehavior(operands, SqlJsonEmptyOrError.EMPTY)
val errorBehavior = getBehavior(operands, SqlJsonEmptyOrError.ERROR)
+ val inputTerm = s"${argTerms.head}.toString()"
+
+ val (varName, parseCode) =
+ ctx.getReusableInputUnboxingExprs(inputTerm, Int.MinValue) match {
+ case Some(expr) => (expr.resultTerm, "")
+ case None =>
+ val newVarName = CodeGenUtils.newName(ctx, "jsonParsed")
+ val typeName = classOf[SqlJsonUtils.JsonValueContext].getName
+ ctx.addReusableMember(s"$typeName $newVarName;")
+ ctx.addReusableInputUnboxingExprs(
+ inputTerm,
+ Int.MinValue,
+ GeneratedExpression(newVarName, "false", "", null))
+ val assign =
+ s"$newVarName =
${qualifyMethod(BuiltInMethods.JSON_PARSE)}($inputTerm);"
+ (newVarName, assign)
+ }
+
val terms = Seq(
- s"${argTerms.head}.toString()",
+ varName,
s"${argTerms(1)}.toString()",
qualifyEnum(emptyBehavior._1),
emptyBehavior._2,
@@ -55,8 +91,10 @@ class JsonValueCallGen extends CallGenerator {
val rawResultTerm = CodeGenUtils.newName(ctx, "rawResult")
val call = s"""
+ |$parseCode
|Object $rawResultTerm =
- |
${qualifyMethod(BuiltInMethods.JSON_VALUE)}(${terms.mkString(", ")});
+ |
${qualifyMethod(BuiltInMethods.JSON_VALUE)}(${terms
+ .mkString(", ")});
""".stripMargin
val convertedResult = returnType.getTypeRoot match {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/JsonParseReuseTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/JsonParseReuseTest.java
new file mode 100644
index 00000000000..a1591841b5b
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/JsonParseReuseTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.codegen.calls.BuiltInMethods;
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests that multiple JSON function calls on the same input reuse the parsed
JSON. */
+class JsonParseReuseTest {
+
+ private static final Pattern JSON_PARSE_PATTERN =
+ Pattern.compile("\\b" +
Pattern.quote(BuiltInMethods.JSON_PARSE().getName()) + "\\(");
+
+ private StreamTableEnvironment tEnv;
+
+ private static final String JSON_ROW1 =
+
"{\"type\":\"account\",\"age\":42,\"address\":{\"city\":\"Munich\"},\"roles\":[\"user\",\"viewer\"]}";
+ private static final String JSON_ROW2 =
+
"{\"type\":\"admin\",\"age\":30,\"address\":{\"city\":\"Berlin\"},\"roles\":[\"admin\"]}";
+
+ @BeforeEach
+ void setUp() {
+ tEnv =
+ StreamTableEnvironment.create(
+ StreamExecutionEnvironment.getExecutionEnvironment(),
+ EnvironmentSettings.inStreamingMode());
+ tEnv.createTemporaryView(
+ "json_src",
+ tEnv.fromValues(Row.of(JSON_ROW1, "{}"), Row.of(JSON_ROW2,
"{\"x\":1}"))
+ .as("json_data", "other_json"));
+ }
+
+ private List<Row> collect(final String sql) {
+ final TableResult result = tEnv.executeSql(sql);
+ final List<Row> rows = new ArrayList<>();
+ result.collect().forEachRemaining(rows::add);
+ return rows;
+ }
+
+ private static int countJsonParse(final String code) {
+ final Matcher m = JSON_PARSE_PATTERN.matcher(code);
+ int count = 0;
+ while (m.find()) {
+ count++;
+ }
+ return count;
+ }
+
+ private String extractGeneratedCode(final String sql) {
+ final Table table = tEnv.sqlQuery(sql);
+ final Transformation<?> root =
tEnv.toChangelogStream(table).getTransformation();
+ final StringBuilder allCode = new StringBuilder();
+ for (final Transformation<?> t : root.getTransitivePredecessors()) {
+ if (t instanceof OneInputTransformation
+ && ((OneInputTransformation<?, ?>) t).getOperatorFactory()
+ instanceof CodeGenOperatorFactory) {
+ final CodeGenOperatorFactory<?> factory =
+ (CodeGenOperatorFactory<?>)
+ ((OneInputTransformation<?, ?>)
t).getOperatorFactory();
+ allCode.append(factory.getGeneratedClass().getCode());
+ }
+ }
+ return allCode.toString();
+ }
+
+ @Test
+ void testTwoJsonValueCalls() {
+ final String sql =
+ "SELECT JSON_VALUE(json_data, '$.type'), JSON_VALUE(json_data,
'$.age') FROM json_src";
+ final List<Row> rows = collect(sql);
+ assertThat(rows).containsExactlyInAnyOrder(Row.of("account", "42"),
Row.of("admin", "30"));
+ assertThat(countJsonParse(extractGeneratedCode(sql)))
+ .as("Two JSON_VALUE calls on the same input should parse once")
+ .isEqualTo(1);
+ }
+
+ @Test
+ void testTwoJsonQueryCalls() {
+ final String sql =
+ "SELECT JSON_QUERY(json_data, '$.address'), "
+ + "JSON_QUERY(json_data, '$.roles' WITH WRAPPER) FROM
json_src";
+ final List<Row> rows = collect(sql);
+ assertThat(rows)
+ .containsExactlyInAnyOrder(
+ Row.of("{\"city\":\"Munich\"}",
"[[\"user\",\"viewer\"]]"),
+ Row.of("{\"city\":\"Berlin\"}", "[[\"admin\"]]"));
+ assertThat(countJsonParse(extractGeneratedCode(sql)))
+ .as("Two JSON_QUERY calls on the same input should parse once")
+ .isEqualTo(1);
+ }
+
+ @Test
+ void testJsonValueAndJsonQueryMixed() {
+ final String sql =
+ "SELECT JSON_VALUE(json_data, '$.type'), "
+ + "JSON_QUERY(json_data, '$.address') FROM json_src";
+ final List<Row> rows = collect(sql);
+ assertThat(rows)
+ .containsExactlyInAnyOrder(
+ Row.of("account", "{\"city\":\"Munich\"}"),
+ Row.of("admin", "{\"city\":\"Berlin\"}"));
+ assertThat(countJsonParse(extractGeneratedCode(sql)))
+ .as("JSON_VALUE + JSON_QUERY on the same input should parse
once")
+ .isEqualTo(1);
+ }
+
+ @Test
+ void testThreeJsonFunctionCalls() {
+ final String sql =
+ "SELECT JSON_VALUE(json_data, '$.type'), "
+ + "JSON_VALUE(json_data, '$.age'), "
+ + "JSON_QUERY(json_data, '$.address') FROM json_src";
+ final List<Row> rows = collect(sql);
+ assertThat(rows)
+ .containsExactlyInAnyOrder(
+ Row.of("account", "42", "{\"city\":\"Munich\"}"),
+ Row.of("admin", "30", "{\"city\":\"Berlin\"}"));
+ assertThat(countJsonParse(extractGeneratedCode(sql)))
+ .as("Three JSON function calls on the same input should parse
once")
+ .isEqualTo(1);
+ }
+
+ @Test
+ void testDifferentJsonInputs() {
+ final String sql =
+ "SELECT JSON_VALUE(json_data, '$.type'), "
+ + "JSON_VALUE(other_json, '$.x') FROM json_src";
+ final List<Row> rows = collect(sql);
+ assertThat(rows).containsExactlyInAnyOrder(Row.of("account", null),
Row.of("admin", "1"));
+ assertThat(countJsonParse(extractGeneratedCode(sql)))
+ .as("JSON_VALUE calls on different inputs should parse
separately")
+ .isEqualTo(2);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
index 1ec9561b250..89a0448e95e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
@@ -292,7 +292,13 @@ class JsonFunctionsITCase extends BuiltInFunctionTestBase {
"wrong"),
"JSON_VALUE(f0, 'strict $.[''contains blank'']' NULL
ON EMPTY DEFAULT 'wrong' ON ERROR)",
"right",
- STRING());
+ STRING())
+
+ // Multiple JSON_VALUE calls on the same input should reuse
parsed JSON
+ .testSqlResult(
+ "JSON_VALUE(f0, '$.type'), JSON_VALUE(f0, '$.age')",
+ List.of("account", "42"),
+ List.of(STRING(), STRING()));
}
private static List<TestSetSpec> isJsonSpec() {
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
index 6f20bf76e3c..c73e9981339 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
@@ -160,6 +160,22 @@ public class SqlJsonUtils {
defaultValueOnError);
}
+ /** Accepts a pre-parsed context from {@link #jsonParse}. */
+ public static Object jsonValue(
+ JsonValueContext parsedInput,
+ String pathSpec,
+ JsonValueOnEmptyOrError emptyBehavior,
+ Object defaultValueOnEmpty,
+ JsonValueOnEmptyOrError errorBehavior,
+ Object defaultValueOnError) {
+ return jsonValue(
+ jsonApiCommonSyntax(parsedInput, pathSpec),
+ emptyBehavior,
+ defaultValueOnEmpty,
+ errorBehavior,
+ defaultValueOnError);
+ }
+
private static Object jsonValue(
JsonPathContext context,
JsonValueOnEmptyOrError emptyBehavior,
@@ -221,6 +237,22 @@ public class SqlJsonUtils {
errorBehavior);
}
+ /** Like {@link #jsonQuery} but accepts a pre-parsed context from {@link
#jsonParse}. */
+ public static Object jsonQueryParsed(
+ JsonValueContext parsedInput,
+ String pathSpec,
+ JsonQueryReturnType returnType,
+ JsonQueryWrapper wrapperBehavior,
+ JsonQueryOnEmptyOrError emptyBehavior,
+ JsonQueryOnEmptyOrError errorBehavior) {
+ return jsonQuery(
+ jsonApiCommonSyntax(parsedInput, pathSpec),
+ returnType,
+ wrapperBehavior,
+ emptyBehavior,
+ errorBehavior);
+ }
+
private static Object jsonQuery(
JsonPathContext context,
JsonQueryReturnType returnType,
@@ -414,6 +446,15 @@ public class SqlJsonUtils {
return JSON_PATH_JSON_PROVIDER.parse(input);
}
+ /**
+ * Parses a JSON string into a reusable context object. The result can be
passed to {@link
+ * #jsonValue} or {@link #jsonQueryParsed} to avoid re-parsing the same
JSON string multiple
+ * times.
+ */
+ public static JsonValueContext jsonParse(String input) {
+ return jsonValueExpression(input);
+ }
+
private static JsonValueContext jsonValueExpression(String input) {
try {
return JsonValueContext.withJavaObj(dejsonize(input));
@@ -618,7 +659,7 @@ public class SqlJsonUtils {
}
}
- private static class JsonValueContext {
+ public static class JsonValueContext {
@JsonValue public final Object obj;
public final Exception exc;