This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 954d32ffa59 [FLINK-39638][table] Make codegen reuse result of JSON 
parse in `JSON_VALUE`, `JSON_QUERY`
954d32ffa59 is described below

commit 954d32ffa59edb96f38fce283dde4977adb68946
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon May 11 12:35:05 2026 +0200

    [FLINK-39638][table] Make codegen reuse result of JSON parse in 
`JSON_VALUE`, `JSON_QUERY`
---
 .../planner/codegen/calls/BuiltInMethods.scala     |  16 +-
 .../planner/codegen/calls/JsonQueryCallGen.scala   |  42 +++++-
 .../planner/codegen/calls/JsonValueCallGen.scala   |  42 +++++-
 .../table/planner/codegen/JsonParseReuseTest.java  | 168 +++++++++++++++++++++
 .../planner/functions/JsonFunctionsITCase.java     |   8 +-
 .../table/runtime/functions/SqlJsonUtils.java      |  43 +++++-
 6 files changed, 312 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 20af0357350..c13f9f21771 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -482,7 +482,7 @@ object BuiltInMethods {
   val JSON_VALUE = Types.lookupMethod(
     classOf[SqlJsonUtils],
     "jsonValue",
-    classOf[String],
+    classOf[SqlJsonUtils.JsonValueContext],
     classOf[String],
     classOf[JsonValueOnEmptyOrError],
     classOf[Any],
@@ -501,6 +501,20 @@ object BuiltInMethods {
     classOf[JsonQueryOnEmptyOrError]
   )
 
+  val JSON_PARSE =
+    Types.lookupMethod(classOf[SqlJsonUtils], "jsonParse", classOf[String])
+
+  val JSON_QUERY_PARSED = Types.lookupMethod(
+    classOf[SqlJsonUtils],
+    "jsonQueryParsed",
+    classOf[SqlJsonUtils.JsonValueContext],
+    classOf[String],
+    classOf[JsonQueryReturnType],
+    classOf[JsonQueryWrapper],
+    classOf[JsonQueryOnEmptyOrError],
+    classOf[JsonQueryOnEmptyOrError]
+  )
+
   val IS_JSON_VALUE = Types.lookupMethod(classOf[SqlJsonUtils], "isJsonValue", 
classOf[String])
 
   val IS_JSON_OBJECT = Types.lookupMethod(classOf[SqlJsonUtils], 
"isJsonObject", classOf[String])
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonQueryCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonQueryCallGen.scala
index 30ebc63d24d..2b386ac1fb7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonQueryCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonQueryCallGen.scala
@@ -21,6 +21,7 @@ import org.apache.flink.table.api.{JsonQueryOnEmptyOrError, 
JsonQueryWrapper, Js
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
CodeGenException, CodeGenUtils, GeneratedExpression}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{qualifyEnum, 
qualifyMethod, BINARY_STRING, GENERIC_ARRAY}
 import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallWithStmtIfArgsNotNull
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
 import 
org.apache.flink.table.runtime.functions.SqlJsonUtils.JsonQueryReturnType
 import org.apache.flink.table.types.logical.{ArrayType, LogicalType, 
LogicalTypeRoot}
 
@@ -32,6 +33,23 @@ import org.apache.calcite.sql.SqlJsonEmptyOrError
  * We cannot use [[MethodCallGen]] for a few different reasons. First, the 
return type of the
  * built-in Calcite function is [[Object]] and needs to be cast based on the 
inferred return type
  * instead as users can change this using the RETURNING keyword.
+ *
+ * When multiple JSON function calls share the same input expression, the 
parsed JSON context is
+ * reused via a shared member variable. For example, a query like:
+ * {{{
+ * SELECT JSON_VALUE(json_data, '$.type'), JSON_QUERY(json_data, '$.address') 
FROM t
+ * }}}
+ * generates code similar to:
+ * {{{
+ * // member variable (declared once)
+ * SqlJsonUtils.JsonValueContext jsonParsed$0;
+ *
+ * // in processElement (parse emitted only by the first function)
+ * jsonParsed$0 = SqlJsonUtils.jsonParse(field$0.toString());
+ * Object rawResult$1 = SqlJsonUtils.jsonValue(jsonParsed$0, "$.type", ...);
+ * // second call reuses jsonParsed$123 without re-parsing
+ * Object rawResult$2 = SqlJsonUtils.jsonQuery(jsonParsed$0, "$.address", ...);
+ * }}}
  */
 class JsonQueryCallGen extends CallGenerator {
   override def generate(
@@ -50,8 +68,26 @@ class JsonQueryCallGen extends CallGenerator {
           } else {
             JsonQueryReturnType.STRING
           }
+          val inputTerm = s"${argTerms.head}.toString()"
+
+          val (varName, parseCode) =
+            ctx.getReusableInputUnboxingExprs(inputTerm, Int.MinValue) match {
+              case Some(expr) => (expr.resultTerm, "")
+              case None =>
+                val newVarName = CodeGenUtils.newName(ctx, "jsonParsed")
+                val typeName = classOf[SqlJsonUtils.JsonValueContext].getName
+                ctx.addReusableMember(s"$typeName $newVarName;")
+                ctx.addReusableInputUnboxingExprs(
+                  inputTerm,
+                  Int.MinValue,
+                  GeneratedExpression(newVarName, "false", "", null))
+                val assign =
+                  s"$newVarName = 
${qualifyMethod(BuiltInMethods.JSON_PARSE)}($inputTerm);"
+                (newVarName, assign)
+            }
+
           val terms = Seq(
-            s"${argTerms.head}.toString()",
+            varName,
             s"${argTerms(1)}.toString()",
             qualifyEnum(jsonQueryReturnType),
             qualifyEnum(wrapperBehavior),
@@ -61,8 +97,10 @@ class JsonQueryCallGen extends CallGenerator {
 
           val rawResultTerm = CodeGenUtils.newName(ctx, "rawResult")
           val call = s"""
+                        |$parseCode
                         |Object $rawResultTerm =
-                        |    
${qualifyMethod(BuiltInMethods.JSON_QUERY)}(${terms.mkString(", ")});
+                        |    
${qualifyMethod(BuiltInMethods.JSON_QUERY_PARSED)}(${terms
+                         .mkString(", ")});
            """.stripMargin
 
           val convertedResult = returnType.getTypeRoot match {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
index c6783d8ccc2..f28b60e85e1 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
@@ -21,6 +21,7 @@ import org.apache.flink.table.api.JsonValueOnEmptyOrError
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
CodeGenException, CodeGenUtils, GeneratedExpression}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{qualifyEnum, 
qualifyMethod, BINARY_STRING}
 import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallWithStmtIfArgsNotNull
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
 import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
 
 import org.apache.calcite.sql.SqlJsonEmptyOrError
@@ -32,6 +33,23 @@ import org.apache.calcite.sql.SqlJsonEmptyOrError
  * built-in Calcite function is [[Object]] and needs to be cast based on the 
inferred return type
  * instead as users can change this using the RETURNING keyword. Furthermore, 
we need to provide the
  * proper default values in case not all arguments were given.
+ *
+ * When multiple JSON function calls share the same input expression, the 
parsed JSON context is
+ * reused via a shared member variable. For example, a query like:
+ * {{{
+ * SELECT JSON_VALUE(json_data, '$.type'), JSON_VALUE(json_data, '$.age') FROM 
t
+ * }}}
+ * generates code similar to:
+ * {{{
+ * // member variable (declared once)
+ * SqlJsonUtils.JsonValueContext jsonParsed$0;
+ *
+ * // in processElement (parse emitted only by the first function)
+ * jsonParsed$0 = SqlJsonUtils.jsonParse(field$0.toString());
+ * Object rawResult$1 = SqlJsonUtils.jsonValue(jsonParsed$0, "$.type", ...);
+ * // second call reuses jsonParsed$0 without re-parsing
+ * Object rawResult$2 = SqlJsonUtils.jsonValue(jsonParsed$0, "$.age", ...);
+ * }}}
  */
 class JsonValueCallGen extends CallGenerator {
   override def generate(
@@ -44,8 +62,26 @@ class JsonValueCallGen extends CallGenerator {
         {
           val emptyBehavior = getBehavior(operands, SqlJsonEmptyOrError.EMPTY)
           val errorBehavior = getBehavior(operands, SqlJsonEmptyOrError.ERROR)
+          val inputTerm = s"${argTerms.head}.toString()"
+
+          val (varName, parseCode) =
+            ctx.getReusableInputUnboxingExprs(inputTerm, Int.MinValue) match {
+              case Some(expr) => (expr.resultTerm, "")
+              case None =>
+                val newVarName = CodeGenUtils.newName(ctx, "jsonParsed")
+                val typeName = classOf[SqlJsonUtils.JsonValueContext].getName
+                ctx.addReusableMember(s"$typeName $newVarName;")
+                ctx.addReusableInputUnboxingExprs(
+                  inputTerm,
+                  Int.MinValue,
+                  GeneratedExpression(newVarName, "false", "", null))
+                val assign =
+                  s"$newVarName = 
${qualifyMethod(BuiltInMethods.JSON_PARSE)}($inputTerm);"
+                (newVarName, assign)
+            }
+
           val terms = Seq(
-            s"${argTerms.head}.toString()",
+            varName,
             s"${argTerms(1)}.toString()",
             qualifyEnum(emptyBehavior._1),
             emptyBehavior._2,
@@ -55,8 +91,10 @@ class JsonValueCallGen extends CallGenerator {
 
           val rawResultTerm = CodeGenUtils.newName(ctx, "rawResult")
           val call = s"""
+                        |$parseCode
                         |Object $rawResultTerm =
-                        |    
${qualifyMethod(BuiltInMethods.JSON_VALUE)}(${terms.mkString(", ")});
+                        |    
${qualifyMethod(BuiltInMethods.JSON_VALUE)}(${terms
+                         .mkString(", ")});
            """.stripMargin
 
           val convertedResult = returnType.getTypeRoot match {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/JsonParseReuseTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/JsonParseReuseTest.java
new file mode 100644
index 00000000000..a1591841b5b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/JsonParseReuseTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.codegen.calls.BuiltInMethods;
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests that multiple JSON function calls on the same input reuse the parsed 
JSON. */
+class JsonParseReuseTest {
+
+    private static final Pattern JSON_PARSE_PATTERN =
+            Pattern.compile("\\b" + 
Pattern.quote(BuiltInMethods.JSON_PARSE().getName()) + "\\(");
+
+    private StreamTableEnvironment tEnv;
+
+    private static final String JSON_ROW1 =
+            
"{\"type\":\"account\",\"age\":42,\"address\":{\"city\":\"Munich\"},\"roles\":[\"user\",\"viewer\"]}";
+    private static final String JSON_ROW2 =
+            
"{\"type\":\"admin\",\"age\":30,\"address\":{\"city\":\"Berlin\"},\"roles\":[\"admin\"]}";
+
+    @BeforeEach
+    void setUp() {
+        tEnv =
+                StreamTableEnvironment.create(
+                        StreamExecutionEnvironment.getExecutionEnvironment(),
+                        EnvironmentSettings.inStreamingMode());
+        tEnv.createTemporaryView(
+                "json_src",
+                tEnv.fromValues(Row.of(JSON_ROW1, "{}"), Row.of(JSON_ROW2, 
"{\"x\":1}"))
+                        .as("json_data", "other_json"));
+    }
+
+    private List<Row> collect(final String sql) {
+        final TableResult result = tEnv.executeSql(sql);
+        final List<Row> rows = new ArrayList<>();
+        result.collect().forEachRemaining(rows::add);
+        return rows;
+    }
+
+    private static int countJsonParse(final String code) {
+        final Matcher m = JSON_PARSE_PATTERN.matcher(code);
+        int count = 0;
+        while (m.find()) {
+            count++;
+        }
+        return count;
+    }
+
+    private String extractGeneratedCode(final String sql) {
+        final Table table = tEnv.sqlQuery(sql);
+        final Transformation<?> root = 
tEnv.toChangelogStream(table).getTransformation();
+        final StringBuilder allCode = new StringBuilder();
+        for (final Transformation<?> t : root.getTransitivePredecessors()) {
+            if (t instanceof OneInputTransformation
+                    && ((OneInputTransformation<?, ?>) t).getOperatorFactory()
+                            instanceof CodeGenOperatorFactory) {
+                final CodeGenOperatorFactory<?> factory =
+                        (CodeGenOperatorFactory<?>)
+                                ((OneInputTransformation<?, ?>) 
t).getOperatorFactory();
+                allCode.append(factory.getGeneratedClass().getCode());
+            }
+        }
+        return allCode.toString();
+    }
+
+    @Test
+    void testTwoJsonValueCalls() {
+        final String sql =
+                "SELECT JSON_VALUE(json_data, '$.type'), JSON_VALUE(json_data, 
'$.age') FROM json_src";
+        final List<Row> rows = collect(sql);
+        assertThat(rows).containsExactlyInAnyOrder(Row.of("account", "42"), 
Row.of("admin", "30"));
+        assertThat(countJsonParse(extractGeneratedCode(sql)))
+                .as("Two JSON_VALUE calls on the same input should parse once")
+                .isEqualTo(1);
+    }
+
+    @Test
+    void testTwoJsonQueryCalls() {
+        final String sql =
+                "SELECT JSON_QUERY(json_data, '$.address'), "
+                        + "JSON_QUERY(json_data, '$.roles' WITH WRAPPER) FROM 
json_src";
+        final List<Row> rows = collect(sql);
+        assertThat(rows)
+                .containsExactlyInAnyOrder(
+                        Row.of("{\"city\":\"Munich\"}", 
"[[\"user\",\"viewer\"]]"),
+                        Row.of("{\"city\":\"Berlin\"}", "[[\"admin\"]]"));
+        assertThat(countJsonParse(extractGeneratedCode(sql)))
+                .as("Two JSON_QUERY calls on the same input should parse once")
+                .isEqualTo(1);
+    }
+
+    @Test
+    void testJsonValueAndJsonQueryMixed() {
+        final String sql =
+                "SELECT JSON_VALUE(json_data, '$.type'), "
+                        + "JSON_QUERY(json_data, '$.address') FROM json_src";
+        final List<Row> rows = collect(sql);
+        assertThat(rows)
+                .containsExactlyInAnyOrder(
+                        Row.of("account", "{\"city\":\"Munich\"}"),
+                        Row.of("admin", "{\"city\":\"Berlin\"}"));
+        assertThat(countJsonParse(extractGeneratedCode(sql)))
+                .as("JSON_VALUE + JSON_QUERY on the same input should parse 
once")
+                .isEqualTo(1);
+    }
+
+    @Test
+    void testThreeJsonFunctionCalls() {
+        final String sql =
+                "SELECT JSON_VALUE(json_data, '$.type'), "
+                        + "JSON_VALUE(json_data, '$.age'), "
+                        + "JSON_QUERY(json_data, '$.address') FROM json_src";
+        final List<Row> rows = collect(sql);
+        assertThat(rows)
+                .containsExactlyInAnyOrder(
+                        Row.of("account", "42", "{\"city\":\"Munich\"}"),
+                        Row.of("admin", "30", "{\"city\":\"Berlin\"}"));
+        assertThat(countJsonParse(extractGeneratedCode(sql)))
+                .as("Three JSON function calls on the same input should parse 
once")
+                .isEqualTo(1);
+    }
+
+    @Test
+    void testDifferentJsonInputs() {
+        final String sql =
+                "SELECT JSON_VALUE(json_data, '$.type'), "
+                        + "JSON_VALUE(other_json, '$.x') FROM json_src";
+        final List<Row> rows = collect(sql);
+        assertThat(rows).containsExactlyInAnyOrder(Row.of("account", null), 
Row.of("admin", "1"));
+        assertThat(countJsonParse(extractGeneratedCode(sql)))
+                .as("JSON_VALUE calls on different inputs should parse 
separately")
+                .isEqualTo(2);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
index 1ec9561b250..89a0448e95e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
@@ -292,7 +292,13 @@ class JsonFunctionsITCase extends BuiltInFunctionTestBase {
                                         "wrong"),
                         "JSON_VALUE(f0, 'strict $.[''contains blank'']' NULL 
ON EMPTY DEFAULT 'wrong' ON ERROR)",
                         "right",
-                        STRING());
+                        STRING())
+
+                // Multiple JSON_VALUE calls on the same input should reuse 
parsed JSON
+                .testSqlResult(
+                        "JSON_VALUE(f0, '$.type'), JSON_VALUE(f0, '$.age')",
+                        List.of("account", "42"),
+                        List.of(STRING(), STRING()));
     }
 
     private static List<TestSetSpec> isJsonSpec() {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
index 6f20bf76e3c..c73e9981339 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
@@ -160,6 +160,22 @@ public class SqlJsonUtils {
                 defaultValueOnError);
     }
 
+    /** Accepts a pre-parsed context from {@link #jsonParse}. */
+    public static Object jsonValue(
+            JsonValueContext parsedInput,
+            String pathSpec,
+            JsonValueOnEmptyOrError emptyBehavior,
+            Object defaultValueOnEmpty,
+            JsonValueOnEmptyOrError errorBehavior,
+            Object defaultValueOnError) {
+        return jsonValue(
+                jsonApiCommonSyntax(parsedInput, pathSpec),
+                emptyBehavior,
+                defaultValueOnEmpty,
+                errorBehavior,
+                defaultValueOnError);
+    }
+
     private static Object jsonValue(
             JsonPathContext context,
             JsonValueOnEmptyOrError emptyBehavior,
@@ -221,6 +237,22 @@ public class SqlJsonUtils {
                 errorBehavior);
     }
 
+    /** Like {@link #jsonQuery} but accepts a pre-parsed context from {@link 
#jsonParse}. */
+    public static Object jsonQueryParsed(
+            JsonValueContext parsedInput,
+            String pathSpec,
+            JsonQueryReturnType returnType,
+            JsonQueryWrapper wrapperBehavior,
+            JsonQueryOnEmptyOrError emptyBehavior,
+            JsonQueryOnEmptyOrError errorBehavior) {
+        return jsonQuery(
+                jsonApiCommonSyntax(parsedInput, pathSpec),
+                returnType,
+                wrapperBehavior,
+                emptyBehavior,
+                errorBehavior);
+    }
+
     private static Object jsonQuery(
             JsonPathContext context,
             JsonQueryReturnType returnType,
@@ -414,6 +446,15 @@ public class SqlJsonUtils {
         return JSON_PATH_JSON_PROVIDER.parse(input);
     }
 
+    /**
+     * Parses a JSON string into a reusable context object. The result can be 
passed to {@link
+     * #jsonValue} or {@link #jsonQueryParsed} to avoid re-parsing the same 
JSON string multiple
+     * times.
+     */
+    public static JsonValueContext jsonParse(String input) {
+        return jsonValueExpression(input);
+    }
+
     private static JsonValueContext jsonValueExpression(String input) {
         try {
             return JsonValueContext.withJavaObj(dejsonize(input));
@@ -618,7 +659,7 @@ public class SqlJsonUtils {
         }
     }
 
-    private static class JsonValueContext {
+    public static class JsonValueContext {
         @JsonValue public final Object obj;
         public final Exception exc;
 

Reply via email to