This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9457b8792fa [FLINK-34111][table] Add support for JSON_QUOTE, 
JSON_UNQUOTE
9457b8792fa is described below

commit 9457b8792fa285ecd42576a8f7115faa7a25f109
Author: anupamaggarwal <[email protected]>
AuthorDate: Thu Jun 20 09:40:01 2024 +0530

    [FLINK-34111][table] Add support for JSON_QUOTE, JSON_UNQUOTE
    
    This closes #24967
    
    ---------
    
    Co-authored-by: Anupam Aggarwal <[email protected]>
    Co-authored-by: Jeyhun Karimov <[email protected]>
---
 docs/data/sql_functions.yml                        |   6 +
 docs/data/sql_functions_zh.yml                     |   6 +
 .../docs/reference/pyflink.table/expressions.rst   |   2 +
 flink-python/pyflink/table/expression.py           |  25 +-
 .../flink/table/api/internal/BaseExpressions.java  |  15 ++
 .../functions/BuiltInFunctionDefinitions.java      |  18 ++
 .../planner/functions/JsonFunctionsITCase.java     | 298 ++++++++++++++++++++-
 .../planner/expressions/ScalarFunctionsTest.scala  |  76 ++++++
 .../functions/scalar/JsonQuoteFunction.java        |  90 +++++++
 .../functions/scalar/JsonUnquoteFunction.java      | 128 +++++++++
 10 files changed, 661 insertions(+), 3 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 8dca117fa7b..006f7170e60 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -400,6 +400,12 @@ string:
   - sql: SUBSTR(string, integer1[, integer2])
     table: STRING.substr(INTEGER1[, INTEGER2])
     description: Returns a substring of string starting from position integer1 
with length integer2 (to the end by default).
+  - sql: JSON_QUOTE(string)
+    table: STRING.JsonQuote()
+    description: Quotes a string as a JSON value by wrapping it with double 
quote characters, escaping interior quote and special characters ('"', '\', 
'/', 'b', 'f', 'n', 'r', 't'), and returning the result as a string. If the 
argument is NULL, the function returns NULL.
+  - sql: JSON_UNQUOTE(string)
+    table: STRING.JsonUnquote()
+    description: Unquotes JSON value, unescapes escaped special characters 
('"', '\', '/', 'b', 'f', 'n', 'r', 't', 'u' hex hex hex hex), and returns the 
result as a string. If the argument is NULL, returns NULL. If the value does 
not start and end with double quotes or if it starts and ends with double 
quotes but is not a valid JSON string literal, the value is passed through 
unmodified.
 
 temporal:
   - sql: DATE string
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index ad8451b13f2..8f79f28f090 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -500,6 +500,12 @@ string:
   - sql: SUBSTR(string, integer1[, integer2])
     table: STRING.substr(INTEGER1[, INTEGER2])
     description: 返回字符串的子字符串,从位置 integer1 开始,长度为 integer2(默认到末尾)。
+  - sql: JSON_QUOTE(string)
+    table: STRING.JsonQuote()
+    description: Quotes a string as a JSON value by wrapping it with double 
quote characters, escaping interior quote and special characters ('"', '\', 
'/', 'b', 'f', 'n', 'r', 't'), and returning the result as a string. If the 
argument is NULL, the function returns NULL.
+  - sql: JSON_UNQUOTE(string)
+    table: STRING.JsonUnquote()
+    description: Unquotes JSON value, unescapes escaped special characters 
('"', '\', '/', 'b', 'f', 'n', 'r', 't', 'u' hex hex hex hex), and returns the 
result as a string. If the argument is NULL, returns NULL. If the value does 
not start and end with double quotes or if it starts and ends with double 
quotes but is not a valid JSON string literal, the value is passed through 
unmodified.
 
 temporal:
   - sql: DATE string
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 57c8ed314c8..9ee086e5359 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -309,3 +309,5 @@ JSON functions
     Expression.json_exists
     Expression.json_value
     Expression.json_query
+    Expression.json_quote
+    Expression.json_unquote
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index a8500836514..c12fb05c4e8 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -88,7 +88,8 @@ _string_doc_seealso = """
              :func:`~Expression.overlay`, :func:`~Expression.regexp_replace`,
              :func:`~Expression.regexp_extract`, :func:`~Expression.substring`,
              :py:attr:`~Expression.from_base64`, 
:py:attr:`~Expression.to_base64`,
-             :py:attr:`~Expression.ltrim`, :py:attr:`~Expression.rtrim`, 
:func:`~Expression.repeat`
+             :py:attr:`~Expression.ltrim`, :py:attr:`~Expression.rtrim`, 
:func:`~Expression.repeat`,
+             :func:`~Expression.json_quote`, :func:`~Expression.json_unquote`
 """
 
 _temporal_doc_seealso = """
@@ -186,7 +187,8 @@ def _make_string_doc():
         Expression.init_cap, Expression.like, Expression.similar, 
Expression.position,
         Expression.lpad, Expression.rpad, Expression.overlay, 
Expression.regexp_replace,
         Expression.regexp_extract, Expression.from_base64, 
Expression.to_base64,
-        Expression.ltrim, Expression.rtrim, Expression.repeat
+        Expression.ltrim, Expression.rtrim, Expression.repeat,
+        Expression.json_quote, Expression.json_unquote
     ]
 
     for func in string_funcs:
@@ -1988,6 +1990,25 @@ class Expression(Generic[T]):
                                         
on_empty._to_j_json_query_on_error_or_empty(),
                                         
on_error._to_j_json_query_on_error_or_empty())
 
+    def json_quote(self) -> 'Expression':
+        """
+        Quotes a string as a JSON value by wrapping it with double quote 
characters,
+        escaping interior quote and special characters
+        ('"', '\', '/', 'b', 'f', 'n', 'r', 't'), and returning
+        the result as a string. If the argument is NULL, the function returns 
NULL.
+        """
+        return _unary_op("jsonQuote")(self)
+
+    def json_unquote(self) -> 'Expression':
+        """
+        Unquotes JSON value, unescapes escaped special characters
+        ('"', '\', '/', 'b', 'f', 'n', 'r', 't', 'u' hex hex hex hex) and
+        returns the result as a string.
+        If the argument is NULL, returns NULL. If the value starts and ends 
with
+        double quotes but is not a valid JSON string literal, an error occurs.
+        """
+        return _unary_op("jsonUnquote")(self)
+
 
 # add the docs
 _make_math_log_doc()
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 1a201d99547..5c27570c1af 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -117,6 +117,8 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NUL
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_TRUE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_EXISTS;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUERY;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUOTE;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_UNQUOTE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_VALUE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LAST_VALUE;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LEFT;
@@ -1150,6 +1152,19 @@ public abstract class BaseExpressions<InType, OutType> {
                 unresolvedCall(REGEXP_EXTRACT, toExpr(), 
objectToExpression(regex)));
     }
 
+    /**
+     * Returns a string by quotes a string as a JSON value and wrapping it 
with double quote
+     * characters.
+     */
+    public OutType jsonQuote() {
+        return toApiSpecificExpression(unresolvedCall(JSON_QUOTE, 
objectToExpression(toExpr())));
+    }
+
+    /** Returns a string by unquoting JSON value. */
+    public OutType jsonUnquote() {
+        return toApiSpecificExpression(unresolvedCall(JSON_UNQUOTE, 
objectToExpression(toExpr())));
+    }
+
     /** Returns the base string decoded with base64. */
     public OutType fromBase64() {
         return toApiSpecificExpression(unresolvedCall(FROM_BASE64, toExpr()));
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index f3c95330cbc..99569c3f936 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1125,6 +1125,24 @@ public final class BuiltInFunctionDefinitions {
                     
.outputTypeStrategy(explicit(DataTypes.STRING().nullable()))
                     .build();
 
+    public static final BuiltInFunctionDefinition JSON_QUOTE =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("JSON_QUOTE")
+                    .kind(SCALAR)
+                    
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.JsonQuoteFunction")
+                    .build();
+    public static final BuiltInFunctionDefinition JSON_UNQUOTE =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("JSON_UNQUOTE")
+                    .kind(SCALAR)
+                    
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.JsonUnquoteFunction")
+                    .build();
     public static final BuiltInFunctionDefinition FROM_BASE64 =
             BuiltInFunctionDefinition.newBuilder()
                     .name("fromBase64")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
index 9fe8a48d077..2e572955bf5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
@@ -88,7 +88,9 @@ class JsonFunctionsITCase extends BuiltInFunctionTestBase {
         testCases.addAll(jsonStringSpec());
         testCases.addAll(jsonObjectSpec());
         testCases.addAll(jsonArraySpec());
-
+        testCases.addAll(jsonQuoteSpec());
+        testCases.addAll(jsonUnquoteSpecWithValidInput());
+        testCases.addAll(jsonUnquoteSpecWithInvalidInput());
         return testCases.stream();
     }
 
@@ -840,6 +842,300 @@ class JsonFunctionsITCase extends BuiltInFunctionTestBase 
{
                                 STRING().notNull()));
     }
 
+    private static List<TestSetSpec> jsonQuoteSpec() {
+
+        return Arrays.asList(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUOTE)
+                        .onFieldsWithData(0)
+                        .testResult(
+                                nullOf(STRING()).jsonQuote(),
+                                "JSON_QUOTE(CAST(NULL AS STRING))",
+                                null,
+                                STRING().nullable()),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUOTE)
+                        .onFieldsWithData(
+                                "V",
+                                "\"null\"",
+                                "[1, 2, 3]",
+                                "This is a \t test \n with special characters: 
\" \\ \b \f \r \u0041",
+                                "\"kv_pair_test\": \"\\b\\f\\r\"",
+                                "\ttab and fwd slash /",
+                                "\\u006z will not be escaped",
+                                "≠ will be escaped",
+                                null)
+                        .andDataTypes(
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().nullable())
+                        .testResult(
+                                $("f0").jsonQuote(), "JSON_QUOTE(f0)", 
"\"V\"", STRING().notNull())
+                        .testResult(
+                                $("f1").jsonQuote(),
+                                "JSON_QUOTE(f1)",
+                                "\"\\\"null\\\"\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f2").jsonQuote(),
+                                "JSON_QUOTE(f2)",
+                                "\"[1, 2, 3]\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f3").jsonQuote(),
+                                "JSON_QUOTE(f3)",
+                                "\"This is a \\t test \\n with special 
characters: \\\" \\\\ \\b \\f \\r A\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f4").jsonQuote(),
+                                "JSON_QUOTE(f4)",
+                                "\"\\\"kv_pair_test\\\": 
\\\"\\\\b\\\\f\\\\r\\\"\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f5").jsonQuote(),
+                                "JSON_QUOTE(f5)",
+                                "\"\\ttab and fwd slash \\/\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f6").jsonQuote(),
+                                "JSON_QUOTE(f6)",
+                                "\"\\\\u006z will not be escaped\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f7").jsonQuote(),
+                                "JSON_QUOTE(f7)",
+                                "\"\\u2260 will be escaped\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f8").jsonQuote(), "JSON_QUOTE(f8)", null, 
STRING().nullable()));
+    }
+
+    private static List<TestSetSpec> jsonUnquoteSpecWithValidInput() {
+
+        return Arrays.asList(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_UNQUOTE)
+                        .onFieldsWithData(0)
+                        .testResult(
+                                nullOf(STRING()).jsonQuote(),
+                                "JSON_UNQUOTE(CAST(NULL AS STRING))",
+                                null,
+                                STRING().nullable()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_UNQUOTE)
+                        .onFieldsWithData(
+                                "\"abc\"",
+                                "\"[\"abc\"]\"",
+                                "\"[\"\\u0041\"]\"",
+                                "\"\\u0041\"",
+                                "\"[\"\\t\\u0032\"]\"",
+                                "\"[\"This is a \\t test \\n with special 
characters: \\b \\f \\r \\u0041\"]\"",
+                                "\"\"\"",
+                                "\"\"\ufffa\"",
+                                "\"a unicode \u2260\"",
+                                "\"valid unicode literal \\uD801\\uDC00\"",
+                                "[1,2,3]",
+                                "[]",
+                                "[\"string\",2]",
+                                "{\"key\":\"value\"}",
+                                "{\"key\":[\"complex\"]}",
+                                "{\"key\":1}",
+                                "1",
+                                "true",
+                                null)
+                        .andDataTypes(
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().nullable())
+                        .testResult(
+                                $("f0").jsonUnquote(),
+                                "JSON_UNQUOTE(f0)",
+                                "abc",
+                                STRING().notNull())
+                        .testResult(
+                                $("f1").jsonUnquote(),
+                                "JSON_UNQUOTE(f1)",
+                                "[\"abc\"]",
+                                STRING().notNull())
+                        .testResult(
+                                $("f2").jsonUnquote(),
+                                "JSON_UNQUOTE(f2)",
+                                "[\"A\"]",
+                                STRING().notNull())
+                        .testResult(
+                                $("f3").jsonUnquote(), "JSON_UNQUOTE(f3)", 
"A", STRING().notNull())
+                        .testResult(
+                                $("f4").jsonUnquote(),
+                                "JSON_UNQUOTE(f4)",
+                                "[\"\t2\"]",
+                                STRING().notNull())
+                        .testResult(
+                                $("f5").jsonUnquote(),
+                                "JSON_UNQUOTE(f5)",
+                                "[\"This is a \t test \n with special 
characters: \b \f \r A\"]",
+                                STRING().notNull())
+                        .testResult(
+                                $("f6").jsonUnquote(), "JSON_UNQUOTE(f6)", 
"\"", STRING().notNull())
+                        .testResult(
+                                $("f7").jsonUnquote(),
+                                "JSON_UNQUOTE(f7)",
+                                "\"\ufffa",
+                                STRING().notNull())
+                        .testResult(
+                                $("f8").jsonUnquote(),
+                                "JSON_UNQUOTE(f8)",
+                                "a unicode ≠",
+                                STRING().notNull())
+                        .testResult(
+                                $("f9").jsonUnquote(),
+                                "JSON_UNQUOTE(f9)",
+                                "valid unicode literal \uD801\uDC00",
+                                STRING().notNull())
+                        .testResult(
+                                $("f10").jsonUnquote(),
+                                "JSON_UNQUOTE(f10)",
+                                "[1,2,3]",
+                                STRING().notNull())
+                        .testResult(
+                                $("f11").jsonUnquote(),
+                                "JSON_UNQUOTE(f11)",
+                                "[]",
+                                STRING().notNull())
+                        .testResult(
+                                $("f12").jsonUnquote(),
+                                "JSON_UNQUOTE(f12)",
+                                "[\"string\",2]",
+                                STRING().notNull())
+                        .testResult(
+                                $("f13").jsonUnquote(),
+                                "JSON_UNQUOTE(f13)",
+                                "{\"key\":\"value\"}",
+                                STRING().notNull())
+                        .testResult(
+                                $("f14").jsonUnquote(),
+                                "JSON_UNQUOTE(f14)",
+                                "{\"key\":[\"complex\"]}",
+                                STRING().notNull())
+                        .testResult(
+                                $("f15").jsonUnquote(),
+                                "JSON_UNQUOTE(f15)",
+                                "{\"key\":1}",
+                                STRING().notNull())
+                        .testResult(
+                                $("f16").jsonUnquote(),
+                                "JSON_UNQUOTE(f16)",
+                                "1",
+                                STRING().notNull())
+                        .testResult(
+                                $("f17").jsonUnquote(),
+                                "JSON_UNQUOTE(f17)",
+                                "true",
+                                STRING().notNull())
+                        .testResult(
+                                $("f18").jsonUnquote(),
+                                "JSON_UNQUOTE(f18)",
+                                null,
+                                STRING().nullable()));
+    }
+
+    private static List<TestSetSpec> jsonUnquoteSpecWithInvalidInput() {
+
+        return Arrays.asList(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_UNQUOTE)
+                        .onFieldsWithData(0)
+                        .testResult(
+                                nullOf(STRING()).jsonQuote(),
+                                "JSON_UNQUOTE(CAST(NULL AS STRING))",
+                                null,
+                                STRING().nullable()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_UNQUOTE)
+                        .onFieldsWithData(
+                                "\"invalid json string pass through \\u006z\"",
+                                "\"invalid unicode literal and invalid json 
through \\u23\"",
+                                "\"invalid unicode literal and invalid json 
pass through \\u≠FFF\"",
+                                "\"invalid unicode literal but valid json pass 
through \"\"\\uzzzz\"",
+                                "\"[1,2,3]",
+                                "\"[1, 2, 3}",
+                                "\"",
+                                "[",
+                                "[}",
+                                "",
+                                null)
+                        .andDataTypes(
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().notNull(),
+                                STRING().nullable())
+                        .testResult(
+                                $("f0").jsonUnquote(),
+                                "JSON_UNQUOTE(f0)",
+                                "\"invalid json string pass through \\u006z\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f1").jsonUnquote(),
+                                "JSON_UNQUOTE(f1)",
+                                "\"invalid unicode literal and invalid json 
through \\u23\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f2").jsonUnquote(),
+                                "JSON_UNQUOTE(f2)",
+                                "\"invalid unicode literal and invalid json 
pass through \\u≠FFF\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f3").jsonUnquote(),
+                                "JSON_UNQUOTE(f3)",
+                                "\"invalid unicode literal but valid json pass 
through \"\"\\uzzzz\"",
+                                STRING().notNull())
+                        .testResult(
+                                $("f4").jsonUnquote(),
+                                "JSON_UNQUOTE(f4)",
+                                "\"[1,2,3]",
+                                STRING().notNull())
+                        .testResult(
+                                $("f5").jsonUnquote(),
+                                "JSON_UNQUOTE(f5)",
+                                "\"[1, 2, 3}",
+                                STRING().notNull())
+                        .testResult(
+                                $("f6").jsonUnquote(), "JSON_UNQUOTE(f6)", 
"\"", STRING().notNull())
+                        .testResult(
+                                $("f7").jsonUnquote(), "JSON_UNQUOTE(f7)", 
"[", STRING().notNull())
+                        .testResult(
+                                $("f8").jsonUnquote(), "JSON_UNQUOTE(f8)", 
"[}", STRING().notNull())
+                        .testResult(
+                                $("f9").jsonUnquote(), "JSON_UNQUOTE(f9)", "", 
STRING().notNull())
+                        .testResult(
+                                $("f10").jsonQuote(),
+                                "JSON_UNQUOTE(f10)",
+                                null,
+                                STRING().nullable()));
+    }
+
     private static List<TestSetSpec> jsonArraySpec() {
         final Map<String, String> mapData = new HashMap<>();
         mapData.put("M1", "V1");
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 700f0138b97..c4d809f631c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -604,6 +604,82 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "foothebar")
   }
 
+  @Test
+  def testJsonQuote(): Unit = {
+    testSqlApi("JSON_QUOTE('null')", "\"null\"")
+    testSqlApi("JSON_QUOTE('\"null\"')", "\"\\\"null\\\"\"")
+    testSqlApi("JSON_QUOTE('[1,2,3]')", "\"[1,2,3]\"")
+    testSqlApi(
+      "JSON_QUOTE('This is a \\t test \\n with special characters: \" \\ \\b 
\\f \\r \\u0041')",
+      "\"This is a \\\\t test \\\\n with special characters: \\\" \\\\ \\\\b 
\\\\f \\\\r \\\\u0041\""
+    )
+    testSqlApi(
+      "JSON_QUOTE('\"special\": \"\\b\\f\\r\"')",
+      "\"\\\"special\\\": \\\"\\\\b\\\\f\\\\r\\\"\"")
+    testSqlApi(
+      "JSON_QUOTE('skipping backslash \\')",
+      "\"skipping backslash \\\\\""
+    )
+    testSqlApi(
+      "JSON_QUOTE('≠ will be escaped')",
+      "\"\\u2260 will be escaped\""
+    )
+    testSqlApi(
+      "JSON_QUOTE('\\u006z will not be escaped')",
+      "\"\\\\u006z will not be escaped\""
+    )
+    testSqlApi("JSON_QUOTE('1')", "\"1\"")
+    testSqlApi("JSON_QUOTE('\"1\"')", "\"\\\"1\\\"\"")
+  }
+
+  @Test
+  def testJsonUnquoteWithValidInput(): Unit = {
+    testSqlApi("JSON_UNQUOTE('\"\\\\u00aa\"')", "\\u00aa")
+    testSqlApi("JSON_UNQUOTE('\"\\u00aa\"')", "\u00aa")
+    testSqlApi("JSON_UNQUOTE('\"\\u00aa\"')", "ª")
+    testSqlApi("JSON_UNQUOTE('\"abc\"')", "abc")
+    testSqlApi("JSON_UNQUOTE('\"[abc]\"')", "[abc]")
+    testSqlApi("JSON_UNQUOTE('\"[\\u0041]\"')", "[A]")
+    testSqlApi("JSON_UNQUOTE('\"\\u0041\"')", "A")
+    testSqlApi("JSON_UNQUOTE('\"[\\t\\u0032]\"')", "[\t2]")
+    testSqlApi(
+      "JSON_UNQUOTE('\"This is a \\t test \\n with special characters: \\b \\f 
\\r \\u0041\"')",
+      "This is a \t test \n with special characters: \b \f \r A"
+    )
+    testSqlApi("JSON_UNQUOTE('\"\"')", "")
+    testSqlApi("JSON_UNQUOTE('\"\"\"')", "\"")
+    testSqlApi("JSON_UNQUOTE('[]')", "[]")
+    testSqlApi("JSON_UNQUOTE('\"\"\\ufffa\"')", "\"\ufffa")
+    testSqlApi("JSON_UNQUOTE('{\"key\":1}')", "{\"key\":1}")
+    testSqlApi("JSON_UNQUOTE('true')", "true")
+  }
+
+  @Test
+  def testJsonQuoteFollowedByUnquoteReturnsOriginal(): Unit = {
+    testSqlApi("JSON_UNQUOTE(JSON_QUOTE('test'))", "test")
+    testSqlApi("JSON_UNQUOTE(JSON_QUOTE('3'))", "3")
+    testSqlApi("JSON_UNQUOTE(JSON_QUOTE('[]'))", "[]")
+    testSqlApi("JSON_UNQUOTE(JSON_QUOTE('{}'))", "{}")
+    testSqlApi("JSON_UNQUOTE(JSON_QUOTE('{\"key\":\"value\"}'))", 
"{\"key\":\"value\"}")
+    testSqlApi("JSON_UNQUOTE(JSON_QUOTE('\"this is not a json'))", "\"this is 
not a json")
+    testSqlApi("JSON_UNQUOTE(JSON_QUOTE(''))", "")
+    testSqlApi("JSON_UNQUOTE(JSON_QUOTE('\"'))", "\"")
+  }
+
+  @Test
+  def testJsonUnquoteWithInvalidInput(): Unit = {
+    testSqlApi("JSON_UNQUOTE('\"[1, 2, 3}')", "\"[1, 2, 3}")
+    testSqlApi("JSON_UNQUOTE('\"')", "\"")
+    testSqlApi("JSON_UNQUOTE('[}')", "[}")
+    testSqlApi("JSON_UNQUOTE('1\"')", "1\"")
+    testSqlApi("JSON_UNQUOTE('[')", "[")
+    testSqlApi("JSON_UNQUOTE('')", "")
+    testSqlApi(
+      "JSON_UNQUOTE('\"invalid unicode literal but valid json pass through 
\"\"\\uzzzz\"')",
+      "\"invalid unicode literal but valid json pass through \"\"\\uzzzz\""
+    )
+  }
+
   @Test
   def testFromBase64(): Unit = {
     testSqlApi("FROM_BASE64('aGVsbG8gd29ybGQ=')", "hello world")
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonQuoteFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonQuoteFunction.java
new file mode 100644
index 00000000000..a6e19796f5e
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonQuoteFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_QUOTE}. */
+@Internal
+public class JsonQuoteFunction extends BuiltInScalarFunction {
+
+    public JsonQuoteFunction(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.JSON_QUOTE, context);
+    }
+
+    public @Nullable Object eval(Object input) {
+        if (input == null) {
+            return null;
+        }
+        BinaryStringData bs = (BinaryStringData) input;
+        String stringWithoutQuotes = quote(bs.toString());
+        String outputVal = String.format("\"%s\"", stringWithoutQuotes);
+        return new BinaryStringData(outputVal);
+    }
+
+    private static String quote(String input) {
+        StringBuilder outputStr = new StringBuilder();
+
+        for (int i = 0; i < input.length(); i++) {
+            int codePoint = input.codePointAt(i);
+            if (codePoint < 128) {
+                appendASCII(outputStr, (char) codePoint);
+            } else {
+                outputStr.append(String.format("\\u%04x", codePoint));
+            }
+        }
+        return outputStr.toString();
+    }
+
+    private static void appendASCII(StringBuilder outputStr, char ch) {
+        switch (ch) {
+            case '"':
+                outputStr.append("\\\"");
+                break;
+            case '\\':
+                outputStr.append("\\\\");
+                break;
+            case '/':
+                outputStr.append("\\/");
+                break;
+            case '\b':
+                outputStr.append("\\b");
+                break;
+            case '\f':
+                outputStr.append("\\f");
+                break;
+            case '\n':
+                outputStr.append("\\n");
+                break;
+            case '\r':
+                outputStr.append("\\r");
+                break;
+            case '\t':
+                outputStr.append("\\t");
+                break;
+            default:
+                outputStr.append(ch);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java
new file mode 100644
index 00000000000..9e038e37b90
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */
+@Internal
+public class JsonUnquoteFunction extends BuiltInScalarFunction {
+
+    public JsonUnquoteFunction(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context);
+    }
+
+    public @Nullable Object eval(Object input) {
+        if (input == null) {
+            return null;
+        }
+        BinaryStringData bs = (BinaryStringData) input;
+        String inputStr = bs.toString();
+        try {
+            if (isValidJsonVal(inputStr)) {
+                return new BinaryStringData(unescapeValidJson(inputStr));
+            }
+        } catch (IllegalArgumentException e) {
+            // ignore exceptions on malformed input only
+        }
+        // return input as-is, either JSON is invalid or we encountered an 
exception while unquoting
+        return new BinaryStringData(inputStr);
+    }
+
+    private static boolean isValidJsonVal(String jsonInString) {
+        // See also BuiltInMethods.scala, IS_JSON_VALUE
+        return SqlJsonUtils.isJsonValue(jsonInString);
+    }
+
+    private static String fromUnicodeLiteral(String input, int curPos) {
+
+        StringBuilder number = new StringBuilder();
+        // isValidJsonVal will already check for unicode literal validity
+        for (char ch : input.substring(curPos, curPos + 4).toCharArray()) {
+            number.append(Character.toLowerCase(ch));
+        }
+        int code = Integer.parseInt(number.toString(), 16);
+        return String.valueOf((char) code);
+    }
+
+    private String unescapeStr(String inputStr) {
+        StringBuilder result = new StringBuilder();
+        int i = 0;
+        while (i < inputStr.length()) {
+            if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) {
+                i++; // move to the next char
+                char ch = inputStr.charAt(i++);
+
+                switch (ch) {
+                    case '"':
+                        result.append(ch);
+                        break;
+                    case '\\':
+                        result.append(ch);
+                        break;
+                    case '/':
+                        result.append(ch);
+                        break;
+                    case 'b':
+                        result.append('\b');
+                        break;
+                    case 'f':
+                        result.append('\f');
+                        break;
+                    case 'n':
+                        result.append('\n');
+                        break;
+                    case 'r':
+                        result.append('\r');
+                        break;
+                    case 't':
+                        result.append('\t');
+                        break;
+                    case 'u':
+                        result.append(fromUnicodeLiteral(inputStr, i));
+                        i = i + 4;
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Illegal escape 
sequence: \\" + ch);
+                }
+            } else {
+                result.append(inputStr.charAt(i));
+                i++;
+            }
+        }
+        return result.toString();
+    }
+
+    private String unescapeValidJson(String inputStr) {
+        // check for a quoted json string val and unescape
+        if (inputStr.charAt(0) == '"' && inputStr.charAt(inputStr.length() - 
1) == '"') {
+            // remove quotes, string len is atleast 2 here
+            return unescapeStr(inputStr.substring(1, inputStr.length() - 1));
+        } else {
+            // string representing Json - array, object or unquoted scalar 
val, return as-is
+            return inputStr;
+        }
+    }
+}


Reply via email to