This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9457b8792fa [FLINK-34111][table] Add support for JSON_QUOTE,
JSON_UNQUOTE
9457b8792fa is described below
commit 9457b8792fa285ecd42576a8f7115faa7a25f109
Author: anupamaggarwal <[email protected]>
AuthorDate: Thu Jun 20 09:40:01 2024 +0530
[FLINK-34111][table] Add support for JSON_QUOTE, JSON_UNQUOTE
This closes #24967
---------
Co-authored-by: Anupam Aggarwal <[email protected]>
Co-authored-by: Jeyhun Karimov <[email protected]>
---
docs/data/sql_functions.yml | 6 +
docs/data/sql_functions_zh.yml | 6 +
.../docs/reference/pyflink.table/expressions.rst | 2 +
flink-python/pyflink/table/expression.py | 25 +-
.../flink/table/api/internal/BaseExpressions.java | 15 ++
.../functions/BuiltInFunctionDefinitions.java | 18 ++
.../planner/functions/JsonFunctionsITCase.java | 298 ++++++++++++++++++++-
.../planner/expressions/ScalarFunctionsTest.scala | 76 ++++++
.../functions/scalar/JsonQuoteFunction.java | 90 +++++++
.../functions/scalar/JsonUnquoteFunction.java | 128 +++++++++
10 files changed, 661 insertions(+), 3 deletions(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 8dca117fa7b..006f7170e60 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -400,6 +400,12 @@ string:
- sql: SUBSTR(string, integer1[, integer2])
table: STRING.substr(INTEGER1[, INTEGER2])
description: Returns a substring of string starting from position integer1
with length integer2 (to the end by default).
+ - sql: JSON_QUOTE(string)
+ table: STRING.JsonQuote()
+ description: Quotes a string as a JSON value by wrapping it with double
quote characters, escaping interior quote and special characters ('"', '\',
'/', 'b', 'f', 'n', 'r', 't'), and returning the result as a string. If the
argument is NULL, the function returns NULL.
+ - sql: JSON_UNQUOTE(string)
+ table: STRING.JsonUnquote()
+ description: Unquotes JSON value, unescapes escaped special characters
('"', '\', '/', 'b', 'f', 'n', 'r', 't', 'u' hex hex hex hex), and returns the
result as a string. If the argument is NULL, returns NULL. If the value does
not start and end with double quotes or if it starts and ends with double
quotes but is not a valid JSON string literal, the value is passed through
unmodified.
temporal:
- sql: DATE string
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index ad8451b13f2..8f79f28f090 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -500,6 +500,12 @@ string:
- sql: SUBSTR(string, integer1[, integer2])
table: STRING.substr(INTEGER1[, INTEGER2])
description: 返回字符串的子字符串,从位置 integer1 开始,长度为 integer2(默认到末尾)。
+ - sql: JSON_QUOTE(string)
+ table: STRING.JsonQuote()
+ description: Quotes a string as a JSON value by wrapping it with double
quote characters, escaping interior quote and special characters ('"', '\',
'/', 'b', 'f', 'n', 'r', 't'), and returning the result as a string. If the
argument is NULL, the function returns NULL.
+ - sql: JSON_UNQUOTE(string)
+ table: STRING.JsonUnquote()
+ description: Unquotes JSON value, unescapes escaped special characters
('"', '\', '/', 'b', 'f', 'n', 'r', 't', 'u' hex hex hex hex), and returns the
result as a string. If the argument is NULL, returns NULL. If the value does
not start and end with double quotes or if it starts and ends with double
quotes but is not a valid JSON string literal, the value is passed through
unmodified.
temporal:
- sql: DATE string
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 57c8ed314c8..9ee086e5359 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -309,3 +309,5 @@ JSON functions
Expression.json_exists
Expression.json_value
Expression.json_query
+ Expression.json_quote
+ Expression.json_unquote
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index a8500836514..c12fb05c4e8 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -88,7 +88,8 @@ _string_doc_seealso = """
:func:`~Expression.overlay`, :func:`~Expression.regexp_replace`,
:func:`~Expression.regexp_extract`, :func:`~Expression.substring`,
:py:attr:`~Expression.from_base64`,
:py:attr:`~Expression.to_base64`,
- :py:attr:`~Expression.ltrim`, :py:attr:`~Expression.rtrim`,
:func:`~Expression.repeat`
+ :py:attr:`~Expression.ltrim`, :py:attr:`~Expression.rtrim`,
:func:`~Expression.repeat`,
+ :func:`~Expression.json_quote`, :func:`~Expression.json_unquote`
"""
_temporal_doc_seealso = """
@@ -186,7 +187,8 @@ def _make_string_doc():
Expression.init_cap, Expression.like, Expression.similar,
Expression.position,
Expression.lpad, Expression.rpad, Expression.overlay,
Expression.regexp_replace,
Expression.regexp_extract, Expression.from_base64,
Expression.to_base64,
- Expression.ltrim, Expression.rtrim, Expression.repeat
+ Expression.ltrim, Expression.rtrim, Expression.repeat,
+ Expression.json_quote, Expression.json_unquote
]
for func in string_funcs:
@@ -1988,6 +1990,25 @@ class Expression(Generic[T]):
on_empty._to_j_json_query_on_error_or_empty(),
on_error._to_j_json_query_on_error_or_empty())
+ def json_quote(self) -> 'Expression':
+ """
+ Quotes a string as a JSON value by wrapping it with double quote
characters,
+ escaping interior quote and special characters
+ ('"', '\', '/', 'b', 'f', 'n', 'r', 't'), and returning
+ the result as a string. If the argument is NULL, the function returns
NULL.
+ """
+ return _unary_op("jsonQuote")(self)
+
+ def json_unquote(self) -> 'Expression':
+ """
+ Unquotes JSON value, unescapes escaped special characters
+ ('"', '\', '/', 'b', 'f', 'n', 'r', 't', 'u' hex hex hex hex) and
+ returns the result as a string.
+ If the argument is NULL, returns NULL. If the value starts and ends
with
+ double quotes but is not a valid JSON string literal, an error occurs.
+ """
+ return _unary_op("jsonUnquote")(self)
+
# add the docs
_make_math_log_doc()
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 1a201d99547..5c27570c1af 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -117,6 +117,8 @@ import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NUL
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_TRUE;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_EXISTS;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUERY;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUOTE;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_UNQUOTE;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_VALUE;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LAST_VALUE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LEFT;
@@ -1150,6 +1152,19 @@ public abstract class BaseExpressions<InType, OutType> {
unresolvedCall(REGEXP_EXTRACT, toExpr(),
objectToExpression(regex)));
}
+ /**
+ * Returns a string by quotes a string as a JSON value and wrapping it
with double quote
+ * characters.
+ */
+ public OutType jsonQuote() {
+ return toApiSpecificExpression(unresolvedCall(JSON_QUOTE,
objectToExpression(toExpr())));
+ }
+
+ /** Returns a string by unquoting JSON value. */
+ public OutType jsonUnquote() {
+ return toApiSpecificExpression(unresolvedCall(JSON_UNQUOTE,
objectToExpression(toExpr())));
+ }
+
/** Returns the base string decoded with base64. */
public OutType fromBase64() {
return toApiSpecificExpression(unresolvedCall(FROM_BASE64, toExpr()));
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index f3c95330cbc..99569c3f936 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1125,6 +1125,24 @@ public final class BuiltInFunctionDefinitions {
.outputTypeStrategy(explicit(DataTypes.STRING().nullable()))
.build();
+ public static final BuiltInFunctionDefinition JSON_QUOTE =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("JSON_QUOTE")
+ .kind(SCALAR)
+
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.JsonQuoteFunction")
+ .build();
+ public static final BuiltInFunctionDefinition JSON_UNQUOTE =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("JSON_UNQUOTE")
+ .kind(SCALAR)
+
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.JsonUnquoteFunction")
+ .build();
public static final BuiltInFunctionDefinition FROM_BASE64 =
BuiltInFunctionDefinition.newBuilder()
.name("fromBase64")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
index 9fe8a48d077..2e572955bf5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
@@ -88,7 +88,9 @@ class JsonFunctionsITCase extends BuiltInFunctionTestBase {
testCases.addAll(jsonStringSpec());
testCases.addAll(jsonObjectSpec());
testCases.addAll(jsonArraySpec());
-
+ testCases.addAll(jsonQuoteSpec());
+ testCases.addAll(jsonUnquoteSpecWithValidInput());
+ testCases.addAll(jsonUnquoteSpecWithInvalidInput());
return testCases.stream();
}
@@ -840,6 +842,300 @@ class JsonFunctionsITCase extends BuiltInFunctionTestBase
{
STRING().notNull()));
}
+ private static List<TestSetSpec> jsonQuoteSpec() {
+
+ return Arrays.asList(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUOTE)
+ .onFieldsWithData(0)
+ .testResult(
+ nullOf(STRING()).jsonQuote(),
+ "JSON_QUOTE(CAST(NULL AS STRING))",
+ null,
+ STRING().nullable()),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_QUOTE)
+ .onFieldsWithData(
+ "V",
+ "\"null\"",
+ "[1, 2, 3]",
+ "This is a \t test \n with special characters:
\" \\ \b \f \r \u0041",
+ "\"kv_pair_test\": \"\\b\\f\\r\"",
+ "\ttab and fwd slash /",
+ "\\u006z will not be escaped",
+ "≠ will be escaped",
+ null)
+ .andDataTypes(
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().nullable())
+ .testResult(
+ $("f0").jsonQuote(), "JSON_QUOTE(f0)",
"\"V\"", STRING().notNull())
+ .testResult(
+ $("f1").jsonQuote(),
+ "JSON_QUOTE(f1)",
+ "\"\\\"null\\\"\"",
+ STRING().notNull())
+ .testResult(
+ $("f2").jsonQuote(),
+ "JSON_QUOTE(f2)",
+ "\"[1, 2, 3]\"",
+ STRING().notNull())
+ .testResult(
+ $("f3").jsonQuote(),
+ "JSON_QUOTE(f3)",
+ "\"This is a \\t test \\n with special
characters: \\\" \\\\ \\b \\f \\r A\"",
+ STRING().notNull())
+ .testResult(
+ $("f4").jsonQuote(),
+ "JSON_QUOTE(f4)",
+ "\"\\\"kv_pair_test\\\":
\\\"\\\\b\\\\f\\\\r\\\"\"",
+ STRING().notNull())
+ .testResult(
+ $("f5").jsonQuote(),
+ "JSON_QUOTE(f5)",
+ "\"\\ttab and fwd slash \\/\"",
+ STRING().notNull())
+ .testResult(
+ $("f6").jsonQuote(),
+ "JSON_QUOTE(f6)",
+ "\"\\\\u006z will not be escaped\"",
+ STRING().notNull())
+ .testResult(
+ $("f7").jsonQuote(),
+ "JSON_QUOTE(f7)",
+ "\"\\u2260 will be escaped\"",
+ STRING().notNull())
+ .testResult(
+ $("f8").jsonQuote(), "JSON_QUOTE(f8)", null,
STRING().nullable()));
+ }
+
+ private static List<TestSetSpec> jsonUnquoteSpecWithValidInput() {
+
+ return Arrays.asList(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_UNQUOTE)
+ .onFieldsWithData(0)
+ .testResult(
+ nullOf(STRING()).jsonQuote(),
+ "JSON_UNQUOTE(CAST(NULL AS STRING))",
+ null,
+ STRING().nullable()),
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_UNQUOTE)
+ .onFieldsWithData(
+ "\"abc\"",
+ "\"[\"abc\"]\"",
+ "\"[\"\\u0041\"]\"",
+ "\"\\u0041\"",
+ "\"[\"\\t\\u0032\"]\"",
+ "\"[\"This is a \\t test \\n with special
characters: \\b \\f \\r \\u0041\"]\"",
+ "\"\"\"",
+ "\"\"\ufffa\"",
+ "\"a unicode \u2260\"",
+ "\"valid unicode literal \\uD801\\uDC00\"",
+ "[1,2,3]",
+ "[]",
+ "[\"string\",2]",
+ "{\"key\":\"value\"}",
+ "{\"key\":[\"complex\"]}",
+ "{\"key\":1}",
+ "1",
+ "true",
+ null)
+ .andDataTypes(
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().nullable())
+ .testResult(
+ $("f0").jsonUnquote(),
+ "JSON_UNQUOTE(f0)",
+ "abc",
+ STRING().notNull())
+ .testResult(
+ $("f1").jsonUnquote(),
+ "JSON_UNQUOTE(f1)",
+ "[\"abc\"]",
+ STRING().notNull())
+ .testResult(
+ $("f2").jsonUnquote(),
+ "JSON_UNQUOTE(f2)",
+ "[\"A\"]",
+ STRING().notNull())
+ .testResult(
+ $("f3").jsonUnquote(), "JSON_UNQUOTE(f3)",
"A", STRING().notNull())
+ .testResult(
+ $("f4").jsonUnquote(),
+ "JSON_UNQUOTE(f4)",
+ "[\"\t2\"]",
+ STRING().notNull())
+ .testResult(
+ $("f5").jsonUnquote(),
+ "JSON_UNQUOTE(f5)",
+ "[\"This is a \t test \n with special
characters: \b \f \r A\"]",
+ STRING().notNull())
+ .testResult(
+ $("f6").jsonUnquote(), "JSON_UNQUOTE(f6)",
"\"", STRING().notNull())
+ .testResult(
+ $("f7").jsonUnquote(),
+ "JSON_UNQUOTE(f7)",
+ "\"\ufffa",
+ STRING().notNull())
+ .testResult(
+ $("f8").jsonUnquote(),
+ "JSON_UNQUOTE(f8)",
+ "a unicode ≠",
+ STRING().notNull())
+ .testResult(
+ $("f9").jsonUnquote(),
+ "JSON_UNQUOTE(f9)",
+ "valid unicode literal \uD801\uDC00",
+ STRING().notNull())
+ .testResult(
+ $("f10").jsonUnquote(),
+ "JSON_UNQUOTE(f10)",
+ "[1,2,3]",
+ STRING().notNull())
+ .testResult(
+ $("f11").jsonUnquote(),
+ "JSON_UNQUOTE(f11)",
+ "[]",
+ STRING().notNull())
+ .testResult(
+ $("f12").jsonUnquote(),
+ "JSON_UNQUOTE(f12)",
+ "[\"string\",2]",
+ STRING().notNull())
+ .testResult(
+ $("f13").jsonUnquote(),
+ "JSON_UNQUOTE(f13)",
+ "{\"key\":\"value\"}",
+ STRING().notNull())
+ .testResult(
+ $("f14").jsonUnquote(),
+ "JSON_UNQUOTE(f14)",
+ "{\"key\":[\"complex\"]}",
+ STRING().notNull())
+ .testResult(
+ $("f15").jsonUnquote(),
+ "JSON_UNQUOTE(f15)",
+ "{\"key\":1}",
+ STRING().notNull())
+ .testResult(
+ $("f16").jsonUnquote(),
+ "JSON_UNQUOTE(f16)",
+ "1",
+ STRING().notNull())
+ .testResult(
+ $("f17").jsonUnquote(),
+ "JSON_UNQUOTE(f17)",
+ "true",
+ STRING().notNull())
+ .testResult(
+ $("f18").jsonUnquote(),
+ "JSON_UNQUOTE(f18)",
+ null,
+ STRING().nullable()));
+ }
+
+ private static List<TestSetSpec> jsonUnquoteSpecWithInvalidInput() {
+
+ return Arrays.asList(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_UNQUOTE)
+ .onFieldsWithData(0)
+ .testResult(
+ nullOf(STRING()).jsonQuote(),
+ "JSON_UNQUOTE(CAST(NULL AS STRING))",
+ null,
+ STRING().nullable()),
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.JSON_UNQUOTE)
+ .onFieldsWithData(
+ "\"invalid json string pass through \\u006z\"",
+ "\"invalid unicode literal and invalid json
through \\u23\"",
+ "\"invalid unicode literal and invalid json
pass through \\u≠FFF\"",
+ "\"invalid unicode literal but valid json pass
through \"\"\\uzzzz\"",
+ "\"[1,2,3]",
+ "\"[1, 2, 3}",
+ "\"",
+ "[",
+ "[}",
+ "",
+ null)
+ .andDataTypes(
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().notNull(),
+ STRING().nullable())
+ .testResult(
+ $("f0").jsonUnquote(),
+ "JSON_UNQUOTE(f0)",
+ "\"invalid json string pass through \\u006z\"",
+ STRING().notNull())
+ .testResult(
+ $("f1").jsonUnquote(),
+ "JSON_UNQUOTE(f1)",
+ "\"invalid unicode literal and invalid json
through \\u23\"",
+ STRING().notNull())
+ .testResult(
+ $("f2").jsonUnquote(),
+ "JSON_UNQUOTE(f2)",
+ "\"invalid unicode literal and invalid json
pass through \\u≠FFF\"",
+ STRING().notNull())
+ .testResult(
+ $("f3").jsonUnquote(),
+ "JSON_UNQUOTE(f3)",
+ "\"invalid unicode literal but valid json pass
through \"\"\\uzzzz\"",
+ STRING().notNull())
+ .testResult(
+ $("f4").jsonUnquote(),
+ "JSON_UNQUOTE(f4)",
+ "\"[1,2,3]",
+ STRING().notNull())
+ .testResult(
+ $("f5").jsonUnquote(),
+ "JSON_UNQUOTE(f5)",
+ "\"[1, 2, 3}",
+ STRING().notNull())
+ .testResult(
+ $("f6").jsonUnquote(), "JSON_UNQUOTE(f6)",
"\"", STRING().notNull())
+ .testResult(
+ $("f7").jsonUnquote(), "JSON_UNQUOTE(f7)",
"[", STRING().notNull())
+ .testResult(
+ $("f8").jsonUnquote(), "JSON_UNQUOTE(f8)",
"[}", STRING().notNull())
+ .testResult(
+ $("f9").jsonUnquote(), "JSON_UNQUOTE(f9)", "",
STRING().notNull())
+ .testResult(
+ $("f10").jsonQuote(),
+ "JSON_UNQUOTE(f10)",
+ null,
+ STRING().nullable()));
+ }
+
private static List<TestSetSpec> jsonArraySpec() {
final Map<String, String> mapData = new HashMap<>();
mapData.put("M1", "V1");
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 700f0138b97..c4d809f631c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -604,6 +604,82 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"foothebar")
}
+ @Test
+ def testJsonQuote(): Unit = {
+ testSqlApi("JSON_QUOTE('null')", "\"null\"")
+ testSqlApi("JSON_QUOTE('\"null\"')", "\"\\\"null\\\"\"")
+ testSqlApi("JSON_QUOTE('[1,2,3]')", "\"[1,2,3]\"")
+ testSqlApi(
+ "JSON_QUOTE('This is a \\t test \\n with special characters: \" \\ \\b
\\f \\r \\u0041')",
+ "\"This is a \\\\t test \\\\n with special characters: \\\" \\\\ \\\\b
\\\\f \\\\r \\\\u0041\""
+ )
+ testSqlApi(
+ "JSON_QUOTE('\"special\": \"\\b\\f\\r\"')",
+ "\"\\\"special\\\": \\\"\\\\b\\\\f\\\\r\\\"\"")
+ testSqlApi(
+ "JSON_QUOTE('skipping backslash \\')",
+ "\"skipping backslash \\\\\""
+ )
+ testSqlApi(
+ "JSON_QUOTE('≠ will be escaped')",
+ "\"\\u2260 will be escaped\""
+ )
+ testSqlApi(
+ "JSON_QUOTE('\\u006z will not be escaped')",
+ "\"\\\\u006z will not be escaped\""
+ )
+ testSqlApi("JSON_QUOTE('1')", "\"1\"")
+ testSqlApi("JSON_QUOTE('\"1\"')", "\"\\\"1\\\"\"")
+ }
+
+ @Test
+ def testJsonUnquoteWithValidInput(): Unit = {
+ testSqlApi("JSON_UNQUOTE('\"\\\\u00aa\"')", "\\u00aa")
+ testSqlApi("JSON_UNQUOTE('\"\\u00aa\"')", "\u00aa")
+ testSqlApi("JSON_UNQUOTE('\"\\u00aa\"')", "ª")
+ testSqlApi("JSON_UNQUOTE('\"abc\"')", "abc")
+ testSqlApi("JSON_UNQUOTE('\"[abc]\"')", "[abc]")
+ testSqlApi("JSON_UNQUOTE('\"[\\u0041]\"')", "[A]")
+ testSqlApi("JSON_UNQUOTE('\"\\u0041\"')", "A")
+ testSqlApi("JSON_UNQUOTE('\"[\\t\\u0032]\"')", "[\t2]")
+ testSqlApi(
+ "JSON_UNQUOTE('\"This is a \\t test \\n with special characters: \\b \\f
\\r \\u0041\"')",
+ "This is a \t test \n with special characters: \b \f \r A"
+ )
+ testSqlApi("JSON_UNQUOTE('\"\"')", "")
+ testSqlApi("JSON_UNQUOTE('\"\"\"')", "\"")
+ testSqlApi("JSON_UNQUOTE('[]')", "[]")
+ testSqlApi("JSON_UNQUOTE('\"\"\\ufffa\"')", "\"\ufffa")
+ testSqlApi("JSON_UNQUOTE('{\"key\":1}')", "{\"key\":1}")
+ testSqlApi("JSON_UNQUOTE('true')", "true")
+ }
+
+ @Test
+ def testJsonQuoteFollowedByUnquoteReturnsOriginal(): Unit = {
+ testSqlApi("JSON_UNQUOTE(JSON_QUOTE('test'))", "test")
+ testSqlApi("JSON_UNQUOTE(JSON_QUOTE('3'))", "3")
+ testSqlApi("JSON_UNQUOTE(JSON_QUOTE('[]'))", "[]")
+ testSqlApi("JSON_UNQUOTE(JSON_QUOTE('{}'))", "{}")
+ testSqlApi("JSON_UNQUOTE(JSON_QUOTE('{\"key\":\"value\"}'))",
"{\"key\":\"value\"}")
+ testSqlApi("JSON_UNQUOTE(JSON_QUOTE('\"this is not a json'))", "\"this is
not a json")
+ testSqlApi("JSON_UNQUOTE(JSON_QUOTE(''))", "")
+ testSqlApi("JSON_UNQUOTE(JSON_QUOTE('\"'))", "\"")
+ }
+
+ @Test
+ def testJsonUnquoteWithInvalidInput(): Unit = {
+ testSqlApi("JSON_UNQUOTE('\"[1, 2, 3}')", "\"[1, 2, 3}")
+ testSqlApi("JSON_UNQUOTE('\"')", "\"")
+ testSqlApi("JSON_UNQUOTE('[}')", "[}")
+ testSqlApi("JSON_UNQUOTE('1\"')", "1\"")
+ testSqlApi("JSON_UNQUOTE('[')", "[")
+ testSqlApi("JSON_UNQUOTE('')", "")
+ testSqlApi(
+ "JSON_UNQUOTE('\"invalid unicode literal but valid json pass through
\"\"\\uzzzz\"')",
+ "\"invalid unicode literal but valid json pass through \"\"\\uzzzz\""
+ )
+ }
+
@Test
def testFromBase64(): Unit = {
testSqlApi("FROM_BASE64('aGVsbG8gd29ybGQ=')", "hello world")
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonQuoteFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonQuoteFunction.java
new file mode 100644
index 00000000000..a6e19796f5e
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonQuoteFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_QUOTE}. */
+@Internal
+public class JsonQuoteFunction extends BuiltInScalarFunction {
+
+ public JsonQuoteFunction(SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.JSON_QUOTE, context);
+ }
+
+ public @Nullable Object eval(Object input) {
+ if (input == null) {
+ return null;
+ }
+ BinaryStringData bs = (BinaryStringData) input;
+ String stringWithoutQuotes = quote(bs.toString());
+ String outputVal = String.format("\"%s\"", stringWithoutQuotes);
+ return new BinaryStringData(outputVal);
+ }
+
+ private static String quote(String input) {
+ StringBuilder outputStr = new StringBuilder();
+
+ for (int i = 0; i < input.length(); i++) {
+ int codePoint = input.codePointAt(i);
+ if (codePoint < 128) {
+ appendASCII(outputStr, (char) codePoint);
+ } else {
+ outputStr.append(String.format("\\u%04x", codePoint));
+ }
+ }
+ return outputStr.toString();
+ }
+
+ private static void appendASCII(StringBuilder outputStr, char ch) {
+ switch (ch) {
+ case '"':
+ outputStr.append("\\\"");
+ break;
+ case '\\':
+ outputStr.append("\\\\");
+ break;
+ case '/':
+ outputStr.append("\\/");
+ break;
+ case '\b':
+ outputStr.append("\\b");
+ break;
+ case '\f':
+ outputStr.append("\\f");
+ break;
+ case '\n':
+ outputStr.append("\\n");
+ break;
+ case '\r':
+ outputStr.append("\\r");
+ break;
+ case '\t':
+ outputStr.append("\\t");
+ break;
+ default:
+ outputStr.append(ch);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java
new file mode 100644
index 00000000000..9e038e37b90
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/JsonUnquoteFunction.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#JSON_UNQUOTE}. */
+@Internal
+public class JsonUnquoteFunction extends BuiltInScalarFunction {
+
+ public JsonUnquoteFunction(SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.JSON_UNQUOTE, context);
+ }
+
+ public @Nullable Object eval(Object input) {
+ if (input == null) {
+ return null;
+ }
+ BinaryStringData bs = (BinaryStringData) input;
+ String inputStr = bs.toString();
+ try {
+ if (isValidJsonVal(inputStr)) {
+ return new BinaryStringData(unescapeValidJson(inputStr));
+ }
+ } catch (IllegalArgumentException e) {
+ // ignore exceptions on malformed input only
+ }
+ // return input as-is, either JSON is invalid or we encountered an
exception while unquoting
+ return new BinaryStringData(inputStr);
+ }
+
+ private static boolean isValidJsonVal(String jsonInString) {
+ // See also BuiltInMethods.scala, IS_JSON_VALUE
+ return SqlJsonUtils.isJsonValue(jsonInString);
+ }
+
+ private static String fromUnicodeLiteral(String input, int curPos) {
+
+ StringBuilder number = new StringBuilder();
+ // isValidJsonVal will already check for unicode literal validity
+ for (char ch : input.substring(curPos, curPos + 4).toCharArray()) {
+ number.append(Character.toLowerCase(ch));
+ }
+ int code = Integer.parseInt(number.toString(), 16);
+ return String.valueOf((char) code);
+ }
+
+ private String unescapeStr(String inputStr) {
+ StringBuilder result = new StringBuilder();
+ int i = 0;
+ while (i < inputStr.length()) {
+ if (inputStr.charAt(i) == '\\' && i + 1 < inputStr.length()) {
+ i++; // move to the next char
+ char ch = inputStr.charAt(i++);
+
+ switch (ch) {
+ case '"':
+ result.append(ch);
+ break;
+ case '\\':
+ result.append(ch);
+ break;
+ case '/':
+ result.append(ch);
+ break;
+ case 'b':
+ result.append('\b');
+ break;
+ case 'f':
+ result.append('\f');
+ break;
+ case 'n':
+ result.append('\n');
+ break;
+ case 'r':
+ result.append('\r');
+ break;
+ case 't':
+ result.append('\t');
+ break;
+ case 'u':
+ result.append(fromUnicodeLiteral(inputStr, i));
+ i = i + 4;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal escape
sequence: \\" + ch);
+ }
+ } else {
+ result.append(inputStr.charAt(i));
+ i++;
+ }
+ }
+ return result.toString();
+ }
+
+ private String unescapeValidJson(String inputStr) {
+ // check for a quoted json string val and unescape
+ if (inputStr.charAt(0) == '"' && inputStr.charAt(inputStr.length() -
1) == '"') {
+ // remove quotes, string len is atleast 2 here
+ return unescapeStr(inputStr.substring(1, inputStr.length() - 1));
+ } else {
+ // string representing Json - array, object or unquoted scalar
val, return as-is
+ return inputStr;
+ }
+ }
+}