twalthr commented on a change in pull request #16776:
URL: https://github.com/apache/flink/pull/16776#discussion_r688491862



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
##########
@@ -1323,4 +1327,211 @@ public OutType jsonExists(String path, 
JsonExistsOnError onError) {
     public OutType jsonExists(String path) {
         return toApiSpecificExpression(unresolvedCall(JSON_EXISTS, toExpr(), 
valueLiteral(path)));
     }
+
+    /**
+     * Extracts a scalar from a JSON string.
+     *
+     * <p>This method searches a JSON string for a given path expression and 
returns the value if
+     * the value at that path is scalar. Non-scalar values cannot be returned. 
By default, the value
+     * is returned as {@link DataTypes#STRING()}. Using {@param returningType} 
a different type can
+     * be chosen, with the following types being supported:
+     *
+     * <ul>
+     *   <li>{@link LogicalTypeRoot#VARCHAR}

Review comment:
       Link to `DataTypes.STRING`() instead.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
##########
@@ -1323,4 +1327,211 @@ public OutType jsonExists(String path, 
JsonExistsOnError onError) {
     public OutType jsonExists(String path) {
         return toApiSpecificExpression(unresolvedCall(JSON_EXISTS, toExpr(), 
valueLiteral(path)));
     }
+
+    /**
+     * Extracts a scalar from a JSON string.
+     *
+     * <p>This method searches a JSON string for a given path expression and 
returns the value if
+     * the value at that path is scalar. Non-scalar values cannot be returned. 
By default, the value
+     * is returned as {@link DataTypes#STRING()}. Using {@param returningType} 
a different type can
+     * be chosen, with the following types being supported:
+     *
+     * <ul>
+     *   <li>{@link LogicalTypeRoot#VARCHAR}
+     *   <li>{@link LogicalTypeRoot#BOOLEAN}
+     *   <li>{@link LogicalTypeRoot#INTEGER}
+     *   <li>{@link LogicalTypeRoot#DOUBLE}
+     * </ul>
+     *
+     * <p>For empty path expressions or errors a behavior can be defined to 
either return {@code
+     * null}, raise an error or return a defined default value instead.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // STRING: "true"
+     * lit("{\"a\": true}").jsonValue("$.a")
+     *
+     * // BOOLEAN: true
+     * lit("{\"a\": true}").jsonValue("$.a", DataTypes.BOOLEAN())
+     *
+     * // BOOLEAN: "false"
+     * lit("{\"a\": true}").jsonValue("lax $.b",
+     *     JsonValueOnEmptyOrError.DEFAULT, false, 
JsonValueOnEmptyOrError.NULL, null)

Review comment:
       Do we actually need `JsonValueOnEmptyOrError.NULL` if there is always a 
`null`argument  following making it semantically equal to 
`JsonValueOnEmptyOrError.DEFAULT`?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
##########
@@ -1513,7 +1514,7 @@
                             sequence(
                                     
logical(LogicalTypeFamily.CHARACTER_STRING),
                                     
and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
-                                    LITERAL,
+                                    TYPE_LITERAL,

Review comment:
       move this to a hotfix commit, the commit is useful on its own

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
##########
@@ -197,6 +198,10 @@ public static InputTypeStrategy comparable(
     public static final LiteralArgumentTypeStrategy LITERAL_OR_NULL =
             new LiteralArgumentTypeStrategy(true);
 
+    /** Strategy that checks if an argument is a literal. */

Review comment:
       nit: `is a type literal`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonValueConverter.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter.converters;
+
+import org.apache.flink.table.api.JsonValueOnEmptyOrError;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlJsonEmptyOrError;
+import org.apache.calcite.sql.SqlJsonValueEmptyOrErrorBehavior;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/** Conversion for {@link BuiltInFunctionDefinitions#JSON_VALUE}. */
+class JsonValueConverter extends CustomizedConverter {
+
+    @Override
+    public RexNode convert(CallExpression call, 
CallExpressionConvertRule.ConvertContext context) {
+        // To keep things simple we only allow the following call signatures:
+        // (jsonValue, jsonPath)
+        // (jsonValue, jsonPath, returningType)
+        // (jsonValue, jsonPath, onEmpty, defaultOnEmpty, onError, 
defaultOnError)
+        // (jsonValue, jsonPath, returningType, onEmpty, defaultOnEmpty, 
onError, defaultOnError)
+        checkArgumentNumber(call, 2, 3, 6, 7);
+
+        final List<RexNode> operands = new LinkedList<>();
+        operands.add(context.toRexNode(call.getChildren().get(0)));
+        operands.add(context.toRexNode(call.getChildren().get(1)));
+        if (call.getChildren().size() <= 2) {
+            return 
context.getRelBuilder().call(FlinkSqlOperatorTable.JSON_VALUE, operands);
+        }
+
+        final Optional<DataType> returnType = getExplicitReturnType(call);
+
+        final int onEmptyIdx = returnType.isPresent() ? 3 : 2;
+        getBehavior(call, onEmptyIdx)
+                .map(
+                        onEmpty ->
+                                getBehaviorOperands(
+                                        context,
+                                        SqlJsonEmptyOrError.EMPTY,
+                                        onEmpty,
+                                        call.getChildren().get(onEmptyIdx + 
1)))
+                .ifPresent(operands::addAll);
+
+        final int onErrorIdx = onEmptyIdx + 2;
+        getBehavior(call, onErrorIdx)
+                .map(
+                        onError ->
+                                getBehaviorOperands(
+                                        context,
+                                        SqlJsonEmptyOrError.ERROR,
+                                        onError,
+                                        call.getChildren().get(onErrorIdx + 
1)))
+                .ifPresent(operands::addAll);
+
+        if (returnType.isPresent()) {
+            final FlinkTypeFactory typeFactory =
+                    (FlinkTypeFactory) 
context.getRelBuilder().getTypeFactory();

Review comment:
       nit: use `ShortcutUtils.unwrapTypeFactory()` with a static import it 
would be shorter

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TypeLiteralArgumentTypeStrategy.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL;
+
+/** Strategy that checks if an argument is a type literal. */
+@Internal
+public class TypeLiteralArgumentTypeStrategy implements ArgumentTypeStrategy {
+
+    @Override
+    public Optional<DataType> inferArgumentType(
+            CallContext callContext, int argumentPos, boolean throwOnFailure) {
+        if (!LITERAL.inferArgumentType(callContext, argumentPos, 
throwOnFailure).isPresent()) {
+            return Optional.empty();
+        }
+
+        return callContext.getArgumentValue(argumentPos, DataType.class);
+    }
+
+    @Override
+    public Signature.Argument getExpectedArgument(
+            FunctionDefinition functionDefinition, int argumentPos) {
+        return Signature.Argument.of("<TYPE LITERAL NOT NULL>");

Review comment:
       how about `<DATA TYPE>` to keep it simple for users?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/converters/JsonValueConverter.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter.converters;
+
+import org.apache.flink.table.api.JsonValueOnEmptyOrError;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlJsonEmptyOrError;
+import org.apache.calcite.sql.SqlJsonValueEmptyOrErrorBehavior;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/** Conversion for {@link BuiltInFunctionDefinitions#JSON_VALUE}. */
+class JsonValueConverter extends CustomizedConverter {
+
+    @Override
+    public RexNode convert(CallExpression call, 
CallExpressionConvertRule.ConvertContext context) {
+        // To keep things simple we only allow the following call signatures:
+        // (jsonValue, jsonPath)
+        // (jsonValue, jsonPath, returningType)
+        // (jsonValue, jsonPath, onEmpty, defaultOnEmpty, onError, 
defaultOnError)

Review comment:
       maybe I'm too opinionated here but wouldn't `(jsonValue, jsonPath, 
returningType, defaultOnEmptyOrError)` be more common?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import org.apache.calcite.sql.{SqlJsonEmptyOrError, 
SqlJsonValueEmptyOrErrorBehavior}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING, 
qualifyEnum, qualifyMethod}
+import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallWithStmtIfArgsNotNull
+import org.apache.flink.table.planner.codegen.{CodeGenException, 
CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
+
+/**
+ * [[CallGenerator]] for [[BuiltInMethods.JSON_VALUE]].
+ *
+ * We cannot use [[MethodCallGen]] for a few different reasons. First, the 
return type of the
+ * built-in Calcite function is [[Object]] and needs to be cast based on the 
inferred return type
+ * instead as users can change this using the RETURNING keyword. Furthermore, 
we need to provide
+ * the proper default values in case not all arguments were given.
+ */
+class JsonValueCallGen extends CallGenerator {
+  override def generate(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      returnType: LogicalType): GeneratedExpression = {
+
+    generateCallWithStmtIfArgsNotNull(ctx, returnType, operands, 
resultNullable = true) {
+      rawTerms => {
+        val emptyBehavior = getBehavior(operands, SqlJsonEmptyOrError.EMPTY)
+        val errorBehavior = getBehavior(operands, SqlJsonEmptyOrError.ERROR)
+        val terms = Seq(
+          s"${rawTerms.head}.toString()",

Review comment:
       nit: `rawTerms` might not be a good name for code readability, maybe 
`argTerms`

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.api.JsonValueOnEmptyOrError;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.runners.Parameterized;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.lit;
+import static org.apache.flink.table.api.Expressions.nullOf;
+
+/** Tests for built-in JSON functions. */
+public class JsonFunctionsITCase extends BuiltInFunctionTestBase {
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static List<TestSpec> testData() throws Exception {
+        final List<TestSpec> testCases = new ArrayList<>();
+        testCases.addAll(jsonExists());
+        testCases.addAll(jsonValue());
+
+        return testCases;
+    }
+
+    private static List<TestSpec> jsonExists() throws Exception {
+        final InputStream jsonResource =
+                
JsonFunctionsITCase.class.getResourceAsStream("/json/json-exists.json");
+        if (jsonResource == null) {
+            throw new IllegalStateException(
+                    String.format("%s: Missing test data.", 
JsonFunctionsITCase.class.getName()));
+        }
+
+        final String jsonValue = IOUtils.toString(jsonResource, 
Charset.defaultCharset());
+        return Arrays.asList(
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_EXISTS)
+                        .onFieldsWithData(jsonValue)
+                        .andDataTypes(DataTypes.STRING())
+
+                        // NULL
+                        .testResult(
+                                nullOf(DataTypes.STRING()).jsonExists("lax $"),
+                                "JSON_EXISTS(CAST(NULL AS STRING), 'lax $')",
+                                null,
+                                DataTypes.BOOLEAN())
+
+                        // Path variants
+                        .testResult(
+                                $("f0").jsonExists("lax $"),
+                                "JSON_EXISTS(f0, 'lax $')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax $.type"),
+                                "JSON_EXISTS(f0, 'lax $.type')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax 
$.author.address.city"),
+                                "JSON_EXISTS(f0, 'lax $.author.address.city')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax $.metadata.tags[0]"),
+                                "JSON_EXISTS(f0, 'lax $.metadata.tags[0]')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax $.metadata.tags[3]"),
+                                "JSON_EXISTS(f0, 'lax $.metadata.tags[3]')",
+                                false,
+                                DataTypes.BOOLEAN())
+                        // This should pass, but is broken due to
+                        // https://issues.apache.org/jira/browse/CALCITE-4717.
+                        // .testResult(
+                        //        $("f0").jsonExists("lax 
$.metadata.references.url"),
+                        //        "JSON_EXISTS(f0, 'lax 
$.metadata.references.url')",
+                        //        true,
+                        //        DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax 
$.metadata.references[0].url"),
+                                "JSON_EXISTS(f0, 'lax 
$.metadata.references[0].url')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax 
$.metadata.references[0].invalid"),
+                                "JSON_EXISTS(f0, 'lax 
$.metadata.references[0].invalid')",
+                                false,
+                                DataTypes.BOOLEAN())
+
+                        // ON ERROR
+                        .testResult(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.TRUE),
+                                "JSON_EXISTS(f0, 'strict $.invalid' TRUE ON 
ERROR)",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.FALSE),
+                                "JSON_EXISTS(f0, 'strict $.invalid' FALSE ON 
ERROR)",
+                                false,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.UNKNOWN),
+                                "JSON_EXISTS(f0, 'strict $.invalid' UNKNOWN ON 
ERROR)",
+                                null,
+                                DataTypes.BOOLEAN())
+                        .testSqlRuntimeError(
+                                "JSON_EXISTS(f0, 'strict $.invalid' ERROR ON 
ERROR)",
+                                "No results for path: $['invalid']")
+                        .testTableApiRuntimeError(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.ERROR),
+                                "No results for path: $['invalid']"));
+    }
+
+    private static List<TestSpec> jsonValue() throws Exception {
+        final InputStream jsonResource =
+                
JsonFunctionsITCase.class.getResourceAsStream("/json/json-value.json");
+        if (jsonResource == null) {
+            throw new IllegalStateException(
+                    String.format("%s: Missing test data.", 
JsonFunctionsITCase.class.getName()));
+        }
+
+        final String jsonValue = IOUtils.toString(jsonResource, 
Charset.defaultCharset());
+        return Arrays.asList(

Review comment:
       unnecessary list here and above

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.api.JsonValueOnEmptyOrError;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.runners.Parameterized;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.lit;
+import static org.apache.flink.table.api.Expressions.nullOf;
+
+/** Tests for built-in JSON functions. */
+public class JsonFunctionsITCase extends BuiltInFunctionTestBase {
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static List<TestSpec> testData() throws Exception {
+        final List<TestSpec> testCases = new ArrayList<>();
+        testCases.addAll(jsonExists());
+        testCases.addAll(jsonValue());
+
+        return testCases;
+    }
+
+    private static List<TestSpec> jsonExists() throws Exception {
+        final InputStream jsonResource =
+                
JsonFunctionsITCase.class.getResourceAsStream("/json/json-exists.json");
+        if (jsonResource == null) {
+            throw new IllegalStateException(
+                    String.format("%s: Missing test data.", 
JsonFunctionsITCase.class.getName()));
+        }
+
+        final String jsonValue = IOUtils.toString(jsonResource, 
Charset.defaultCharset());
+        return Arrays.asList(
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_EXISTS)
+                        .onFieldsWithData(jsonValue)
+                        .andDataTypes(DataTypes.STRING())
+
+                        // NULL
+                        .testResult(
+                                nullOf(DataTypes.STRING()).jsonExists("lax $"),
+                                "JSON_EXISTS(CAST(NULL AS STRING), 'lax $')",
+                                null,
+                                DataTypes.BOOLEAN())
+
+                        // Path variants
+                        .testResult(
+                                $("f0").jsonExists("lax $"),
+                                "JSON_EXISTS(f0, 'lax $')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax $.type"),
+                                "JSON_EXISTS(f0, 'lax $.type')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax 
$.author.address.city"),
+                                "JSON_EXISTS(f0, 'lax $.author.address.city')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax $.metadata.tags[0]"),
+                                "JSON_EXISTS(f0, 'lax $.metadata.tags[0]')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax $.metadata.tags[3]"),
+                                "JSON_EXISTS(f0, 'lax $.metadata.tags[3]')",
+                                false,
+                                DataTypes.BOOLEAN())
+                        // This should pass, but is broken due to
+                        // https://issues.apache.org/jira/browse/CALCITE-4717.
+                        // .testResult(
+                        //        $("f0").jsonExists("lax 
$.metadata.references.url"),
+                        //        "JSON_EXISTS(f0, 'lax 
$.metadata.references.url')",
+                        //        true,
+                        //        DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax 
$.metadata.references[0].url"),
+                                "JSON_EXISTS(f0, 'lax 
$.metadata.references[0].url')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax 
$.metadata.references[0].invalid"),
+                                "JSON_EXISTS(f0, 'lax 
$.metadata.references[0].invalid')",
+                                false,
+                                DataTypes.BOOLEAN())
+
+                        // ON ERROR
+                        .testResult(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.TRUE),
+                                "JSON_EXISTS(f0, 'strict $.invalid' TRUE ON 
ERROR)",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.FALSE),
+                                "JSON_EXISTS(f0, 'strict $.invalid' FALSE ON 
ERROR)",
+                                false,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.UNKNOWN),
+                                "JSON_EXISTS(f0, 'strict $.invalid' UNKNOWN ON 
ERROR)",
+                                null,
+                                DataTypes.BOOLEAN())
+                        .testSqlRuntimeError(
+                                "JSON_EXISTS(f0, 'strict $.invalid' ERROR ON 
ERROR)",
+                                "No results for path: $['invalid']")
+                        .testTableApiRuntimeError(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.ERROR),
+                                "No results for path: $['invalid']"));
+    }
+
+    private static List<TestSpec> jsonValue() throws Exception {
+        final InputStream jsonResource =
+                
JsonFunctionsITCase.class.getResourceAsStream("/json/json-value.json");
+        if (jsonResource == null) {
+            throw new IllegalStateException(
+                    String.format("%s: Missing test data.", 
JsonFunctionsITCase.class.getName()));
+        }
+
+        final String jsonValue = IOUtils.toString(jsonResource, 
Charset.defaultCharset());
+        return Arrays.asList(
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_VALUE)
+                        .onFieldsWithData(jsonValue)
+                        .andDataTypes(DataTypes.STRING())
+
+                        // NULL and invalid types
+
+                        .testResult(
+                                lit(null, DataTypes.STRING()).jsonValue("lax 
$"),
+                                "JSON_VALUE(CAST(NULL AS STRING), 'lax $')",
+                                null,
+                                DataTypes.STRING(),
+                                DataTypes.VARCHAR(2000))
+                        .testTableApiValidationError(

Review comment:
       I would be up for dropping this case.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.api.JsonValueOnEmptyOrError;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.runners.Parameterized;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.lit;
+import static org.apache.flink.table.api.Expressions.nullOf;
+
+/** Tests for built-in JSON functions. */
+public class JsonFunctionsITCase extends BuiltInFunctionTestBase {
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static List<TestSpec> testData() throws Exception {
+        final List<TestSpec> testCases = new ArrayList<>();
+        testCases.addAll(jsonExists());
+        testCases.addAll(jsonValue());
+
+        return testCases;
+    }
+
+    private static List<TestSpec> jsonExists() throws Exception {
+        final InputStream jsonResource =
+                
JsonFunctionsITCase.class.getResourceAsStream("/json/json-exists.json");
+        if (jsonResource == null) {
+            throw new IllegalStateException(
+                    String.format("%s: Missing test data.", 
JsonFunctionsITCase.class.getName()));
+        }
+
+        final String jsonValue = IOUtils.toString(jsonResource, 
Charset.defaultCharset());
+        return Arrays.asList(
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_EXISTS)
+                        .onFieldsWithData(jsonValue)
+                        .andDataTypes(DataTypes.STRING())
+
+                        // NULL
+                        .testResult(
+                                nullOf(DataTypes.STRING()).jsonExists("lax $"),
+                                "JSON_EXISTS(CAST(NULL AS STRING), 'lax $')",
+                                null,
+                                DataTypes.BOOLEAN())
+
+                        // Path variants
+                        .testResult(
+                                $("f0").jsonExists("lax $"),
+                                "JSON_EXISTS(f0, 'lax $')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax $.type"),
+                                "JSON_EXISTS(f0, 'lax $.type')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax 
$.author.address.city"),
+                                "JSON_EXISTS(f0, 'lax $.author.address.city')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax $.metadata.tags[0]"),
+                                "JSON_EXISTS(f0, 'lax $.metadata.tags[0]')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax $.metadata.tags[3]"),
+                                "JSON_EXISTS(f0, 'lax $.metadata.tags[3]')",
+                                false,
+                                DataTypes.BOOLEAN())
+                        // This should pass, but is broken due to
+                        // https://issues.apache.org/jira/browse/CALCITE-4717.
+                        // .testResult(
+                        //        $("f0").jsonExists("lax 
$.metadata.references.url"),
+                        //        "JSON_EXISTS(f0, 'lax 
$.metadata.references.url')",
+                        //        true,
+                        //        DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax 
$.metadata.references[0].url"),
+                                "JSON_EXISTS(f0, 'lax 
$.metadata.references[0].url')",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("lax 
$.metadata.references[0].invalid"),
+                                "JSON_EXISTS(f0, 'lax 
$.metadata.references[0].invalid')",
+                                false,
+                                DataTypes.BOOLEAN())
+
+                        // ON ERROR
+                        .testResult(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.TRUE),
+                                "JSON_EXISTS(f0, 'strict $.invalid' TRUE ON 
ERROR)",
+                                true,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.FALSE),
+                                "JSON_EXISTS(f0, 'strict $.invalid' FALSE ON 
ERROR)",
+                                false,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.UNKNOWN),
+                                "JSON_EXISTS(f0, 'strict $.invalid' UNKNOWN ON 
ERROR)",
+                                null,
+                                DataTypes.BOOLEAN())
+                        .testSqlRuntimeError(
+                                "JSON_EXISTS(f0, 'strict $.invalid' ERROR ON 
ERROR)",
+                                "No results for path: $['invalid']")
+                        .testTableApiRuntimeError(
+                                $("f0").jsonExists("strict $.invalid", 
JsonExistsOnError.ERROR),
+                                "No results for path: $['invalid']"));
+    }
+
+    private static List<TestSpec> jsonValue() throws Exception {
+        final InputStream jsonResource =
+                
JsonFunctionsITCase.class.getResourceAsStream("/json/json-value.json");
+        if (jsonResource == null) {
+            throw new IllegalStateException(
+                    String.format("%s: Missing test data.", 
JsonFunctionsITCase.class.getName()));
+        }
+
+        final String jsonValue = IOUtils.toString(jsonResource, 
Charset.defaultCharset());
+        return Arrays.asList(
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_VALUE)
+                        .onFieldsWithData(jsonValue)
+                        .andDataTypes(DataTypes.STRING())
+
+                        // NULL and invalid types
+
+                        .testResult(
+                                lit(null, DataTypes.STRING()).jsonValue("lax 
$"),
+                                "JSON_VALUE(CAST(NULL AS STRING), 'lax $')",
+                                null,
+                                DataTypes.STRING(),
+                                DataTypes.VARCHAR(2000))
+                        .testTableApiValidationError(
+                                lit(42).jsonValue("$"),
+                                String.format(
+                                        "Invalid function call:%n"
+                                                + "JSON_VALUE(INT NOT NULL, 
CHAR(1) NOT NULL, STRING, "
+                                                + 
"SYMBOL('org.apache.flink.table.api.JsonValueOnEmptyOrError') NOT NULL, "
+                                                + "NULL, 
SYMBOL('org.apache.flink.table.api.JsonValueOnEmptyOrError') NOT NULL, NULL)"))
+
+                        // Supported Data Types
+
+                        .testResult(

Review comment:
       we should still try to keep the tests concise (esp. until the test base 
is more performant), are those 4 tests specs important?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonValueCallGen.scala
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import org.apache.calcite.sql.{SqlJsonEmptyOrError, 
SqlJsonValueEmptyOrErrorBehavior}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_STRING, 
qualifyEnum, qualifyMethod}
+import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallWithStmtIfArgsNotNull
+import org.apache.flink.table.planner.codegen.{CodeGenException, 
CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot}
+
+/**
+ * [[CallGenerator]] for [[BuiltInMethods.JSON_VALUE]].
+ *
+ * We cannot use [[MethodCallGen]] for a few different reasons. First, the 
return type of the
+ * built-in Calcite function is [[Object]] and needs to be cast based on the 
inferred return type
+ * instead as users can change this using the RETURNING keyword. Furthermore, 
we need to provide
+ * the proper default values in case not all arguments were given.
+ */
+class JsonValueCallGen extends CallGenerator {
+  override def generate(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      returnType: LogicalType): GeneratedExpression = {
+
+    generateCallWithStmtIfArgsNotNull(ctx, returnType, operands, 
resultNullable = true) {
+      rawTerms => {
+        val emptyBehavior = getBehavior(operands, SqlJsonEmptyOrError.EMPTY)
+        val errorBehavior = getBehavior(operands, SqlJsonEmptyOrError.ERROR)
+        val terms = Seq(
+          s"${rawTerms.head}.toString()",
+          s"${rawTerms(1)}.toString()",
+          qualifyEnum(emptyBehavior._1),
+          emptyBehavior._2,
+          qualifyEnum(errorBehavior._1),
+          errorBehavior._2
+        )
+
+        val rawResultTerm = ctx.addReusableLocalVariable("Object", "rawResult")

Review comment:
       I think this method is only used for code splitting. there is nothing to 
reuse. you can simple create a variable in the generated code

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
##########
@@ -1323,4 +1327,209 @@ public OutType jsonExists(String path, 
JsonExistsOnError onError) {
     public OutType jsonExists(String path) {
         return toApiSpecificExpression(unresolvedCall(JSON_EXISTS, toExpr(), 
valueLiteral(path)));
     }
+
+    /**
+     * Extracts a scalar from a JSON string.
+     *
+     * <p>This method searches a JSON string for a given path expression and 
returns the value if
+     * the value at that path is scalar. Non-scalar values cannot be returned. 
By default, the value
+     * is returned as {@link DataTypes#STRING()}. Using {@param returningType} 
a different type can
+     * be chosen, with the following types being supported:
+     *
+     * <ul>
+     *   <li>{@link LogicalTypeRoot#VARCHAR}
+     *   <li>{@link LogicalTypeRoot#BOOLEAN}
+     *   <li>{@link LogicalTypeRoot#INTEGER}
+     *   <li>{@link LogicalTypeRoot#DOUBLE}
+     * </ul>
+     *
+     * <p>For empty path expressions or errors a behavior can be defined to 
either return {@code
+     * null}, raise an error or return a defined default value instead.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // STRING: "true"
+     * lit("{\"a\": true}").jsonValue("$.a")
+     *
+     * // BOOLEAN: true
+     * lit("{\"a\": true}").jsonValue("$.a", DataTypes.BOOLEAN())
+     *
+     * // BOOLEAN: "false"
+     * lit("{\"a\": true}").jsonValue("lax $.b",
+     *     JsonValueOnEmptyOrError.DEFAULT, false, 
JsonValueOnEmptyOrError.NULL, null)
+     *
+     * // BOOLEAN: "false"
+     * lit("{\"a\": true}").jsonValue("strict $.b",
+     *     JsonValueOnEmptyOrError.NULL, null, 
JsonValueOnEmptyOrError.DEFAULT, false)
+     * }</pre>
+     *
+     * @param path JSON path to extract.
+     * @param returningType Type to convert the extracted scalar to, otherwise 
defaults to {@link
+     *     DataTypes#STRING()}.
+     * @param onEmpty Behavior in case the path expression is empty.
+     * @param defaultOnEmpty Default value to return if the path expression is 
empty and {@param
+     *     onEmpty} is set to {@link JsonValueOnEmptyOrError#DEFAULT}.
+     * @param onError Behavior in case of an error.
+     * @param defaultOnError Default value to return if there is an error and 
{@param onError} is
+     *     set to {@link JsonValueOnEmptyOrError#DEFAULT}.
+     * @return The extracted scalar value.
+     */
+    public OutType jsonValue(
+            String path,
+            DataType returningType,
+            JsonValueOnEmptyOrError onEmpty,
+            InType defaultOnEmpty,
+            JsonValueOnEmptyOrError onError,
+            InType defaultOnError) {
+        return toApiSpecificExpression(
+                unresolvedCall(
+                        JSON_VALUE,
+                        toExpr(),
+                        valueLiteral(path),
+                        typeLiteral(returningType),
+                        valueLiteral(onEmpty),
+                        objectToExpression(defaultOnEmpty),
+                        valueLiteral(onError),
+                        objectToExpression(defaultOnError)));
+    }
+
+    /**
+     * Extracts a scalar from a JSON string.
+     *
+     * <p>This method searches a JSON string for a given path expression and 
returns the value if
+     * the value at that path is scalar. Non-scalar values cannot be returned. 
By default, the value
+     * is returned as {@link DataTypes#STRING()}.
+     *
+     * <p>For empty path expressions or errors a behavior can be defined to 
either return {@code
+     * null}, raise an error or return a defined default value instead.
+     *
+     * <p>See also {@link #jsonValue(String, DataType, 
JsonValueOnEmptyOrError, Object,
+     * JsonValueOnEmptyOrError, Object)}.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // STRING: "true"
+     * lit("{\"a\": true}").jsonValue("$.a")
+     *
+     * // BOOLEAN: true
+     * lit("{\"a\": true}").jsonValue("$.a", DataTypes.BOOLEAN())
+     *
+     * // BOOLEAN: "false"
+     * lit("{\"a\": true}").jsonValue("lax $.b",
+     *     JsonValueOnEmptyOrError.DEFAULT, false, 
JsonValueOnEmptyOrError.NULL, null)
+     *
+     * // BOOLEAN: "false"
+     * lit("{\"a\": true}").jsonValue("strict $.b",
+     *     JsonValueOnEmptyOrError.NULL, null, 
JsonValueOnEmptyOrError.DEFAULT, false)
+     * }</pre>
+     *
+     * @param path JSON path to extract.
+     * @param onEmpty Behavior in case the path expression is empty.
+     * @param defaultOnEmpty Default value to return if the path expression is 
empty and {@param
+     *     onEmpty} is set to {@link JsonValueOnEmptyOrError#DEFAULT}.
+     * @param onError Behavior in case of an error.
+     * @param defaultOnError Default value to return if there is an error and 
{@param onError} is
+     *     set to {@link JsonValueOnEmptyOrError#DEFAULT}.
+     * @return The extracted scalar value.
+     */
+    public OutType jsonValue(

Review comment:
       This comment is not addressed yet.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to