This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ba9b4f1 [FLINK-23757][python] Support json_exists and json_value
ba9b4f1 is described below
commit ba9b4f1039037d4a2da4f90a2f83612ccf2fa708
Author: Ingo Bürk <[email protected]>
AuthorDate: Wed Aug 18 09:15:23 2021 +0200
[FLINK-23757][python] Support json_exists and json_value
This closes #16874.
---
flink-python/pyflink/table/expression.py | 114 ++++++++++++++++++++-
.../pyflink/table/tests/test_expression.py | 15 ++-
.../table/tests/test_expression_completeness.py | 4 -
3 files changed, 125 insertions(+), 8 deletions(-)
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index 3d92e10..52cc689 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -16,14 +16,20 @@
# limitations under the License.
################################################################################
from enum import Enum
-from typing import Union, TypeVar, Generic
+from typing import Union, TypeVar, Generic, Any
from pyflink import add_version_doc
from pyflink.java_gateway import get_gateway
-from pyflink.table.types import DataType, _to_java_data_type
+from pyflink.table.types import DataType, DataTypes, _to_java_data_type
from pyflink.util.java_utils import to_jarray
-__all__ = ['Expression', 'TimeIntervalUnit', 'TimePointUnit']
+__all__ = [
+ 'Expression',
+ 'TimeIntervalUnit',
+ 'TimePointUnit',
+ 'JsonExistsOnError',
+ 'JsonValueOnEmptyOrError'
+]
_aggregation_doc = """
@@ -291,6 +297,14 @@ def _ternary_op(op_name: str):
return _
+def _varargs_op(op_name: str):
+ def _(self, *args) -> 'Expression':
+ return Expression(
+ getattr(self._j_expr, op_name)(*[_get_java_expression(arg) for arg
in args]))
+
+ return _
+
+
def _expressions_op(op_name: str):
def _(self, *args) -> 'Expression':
from pyflink.table import expressions
@@ -352,6 +366,37 @@ class TimePointUnit(Enum):
return getattr(JTimePointUnit, self.name)
+class JsonExistsOnError(Enum):
+ """
+ Behavior in case of errors for json_exists().
+ """
+
+ TRUE = 0,
+ FALSE = 1,
+ UNKNOWN = 2,
+ ERROR = 3
+
+ def _to_j_json_exists_on_error(self):
+ gateway = get_gateway()
+ JJsonExistsOnError =
gateway.jvm.org.apache.flink.table.api.JsonExistsOnError
+ return getattr(JJsonExistsOnError, self.name)
+
+
+class JsonValueOnEmptyOrError(Enum):
+ """
+ Behavior in case of emptiness or errors for json_value().
+ """
+
+ NULL = 0,
+ ERROR = 1,
+ DEFAULT = 2
+
+ def _to_j_json_value_on_empty_or_error(self):
+ gateway = get_gateway()
+ JJsonValueOnEmptyOrError =
gateway.jvm.org.apache.flink.table.api.JsonValueOnEmptyOrError
+ return getattr(JJsonValueOnEmptyOrError, self.name)
+
+
T = TypeVar('T')
@@ -1354,6 +1399,69 @@ class Expression(Generic[T]):
"""
return _binary_op("sha2")(self, hash_length)
+ # ---------------------------- JSON functions -----------------------------
+
+ def json_exists(self, path: str, on_error: JsonExistsOnError = None) ->
'Expression[bool]':
+ """
+ Determines whether a JSON string satisfies a given search criterion.
+
+ This follows the ISO/IEC TR 19075-6 specification for JSON support in
SQL.
+
+ Examples:
+ ::
+
+ >>> lit('{"a": true}').json_exists('$.a') // True
+ >>> lit('{"a": true}').json_exists('$.b') // False
+ >>> lit('{"a": [{ "b": 1 }]}').json_exists('$.a[0].b') // True
+
+ >>> lit('{"a": true}').json_exists('strict $.b',
JsonExistsOnError.TRUE) // True
+ >>> lit('{"a": true}').json_exists('strict $.b',
JsonExistsOnError.FALSE) // False
+ """
+ if on_error is None:
+ return _binary_op("jsonExists")(self, path)
+ else:
+ return _ternary_op("jsonExists")(self, path,
on_error._to_j_json_exists_on_error())
+
+ def json_value(self,
+ path: str,
+ returning_type: DataType = DataTypes.STRING(),
+ on_empty: JsonValueOnEmptyOrError =
JsonValueOnEmptyOrError.NULL,
+ default_on_empty: Any = None,
+ on_error: JsonValueOnEmptyOrError =
JsonValueOnEmptyOrError.NULL,
+ default_on_error: Any = None) -> 'Expression':
+ """
+ Extracts a scalar from a JSON string.
+
+ This method searches a JSON string for a given path expression and
returns the value if the
+ value at that path is scalar. Non-scalar values cannot be returned. By
default, the value is
+ returned as `DataTypes.STRING()`. Using `returningType` a different
type can be chosen, with
+ the following types being supported:
+
+ * `STRING`
+ * `BOOLEAN`
+ * `INT`
+ * `DOUBLE`
+
+ For empty path expressions or errors a behavior can be defined to
either return `null`,
+ raise an error or return a defined default value instead.
+
+ Examples:
+ ::
+
+ >>> lit('{"a": true}').json_value('$.a')
+ >>> lit('{"a": true}').json_value('$.a', DataTypes.BOOLEAN())
+ >>> lit('{"a": true}').json_value('lax $.b', \
+ JsonValueOnEmptyOrError.DEFAULT, False)
+ >>> lit('{"a": true}').json_value('strict $.b', \
+ JsonValueOnEmptyOrError.NULL, None,
JsonValueOnEmptyOrError.DEFAULT, False)
+ """
+ return _varargs_op("jsonValue")(self, path,
_to_java_data_type(returning_type),
+
on_empty._to_j_json_value_on_empty_or_error(),
+ default_on_empty,
+
on_error._to_j_json_value_on_empty_or_error(),
+ default_on_error)
+
+
# add the docs
_make_math_log_doc()
_make_math_trigonometric_doc()
diff --git a/flink-python/pyflink/table/tests/test_expression.py
b/flink-python/pyflink/table/tests/test_expression.py
index 1e7ef7b..7ba4621 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -18,7 +18,8 @@
import unittest
from pyflink.table import DataTypes
-from pyflink.table.expression import TimeIntervalUnit, TimePointUnit
+from pyflink.table.expression import TimeIntervalUnit, TimePointUnit,
JsonExistsOnError, \
+ JsonValueOnEmptyOrError
from pyflink.table.expressions import (col, lit, range_, and_, or_,
current_date,
current_time, current_timestamp,
local_time,
local_timestamp, temporal_overlaps,
date_format,
@@ -191,6 +192,18 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
self.assertEqual('sha512(a)', str(expr1.sha512))
self.assertEqual('sha2(a, 224)', str(expr1.sha2(224)))
+ # json functions
+ self.assertEqual("JSON_EXISTS('{}', '$.x')",
str(lit('{}').json_exists('$.x')))
+ self.assertEqual("JSON_EXISTS('{}', '$.x', FALSE)",
+ str(lit('{}').json_exists('$.x',
JsonExistsOnError.FALSE)))
+
+ self.assertEqual("JSON_VALUE('{}', '$.x', STRING, NULL, null, NULL,
null)",
+ str(lit('{}').json_value('$.x')))
+ self.assertEqual("JSON_VALUE('{}', '$.x', INT, DEFAULT, 42, ERROR,
null)",
+ str(lit('{}').json_value('$.x', DataTypes.INT(),
+
JsonValueOnEmptyOrError.DEFAULT, 42,
+
JsonValueOnEmptyOrError.ERROR, None)))
+
def test_expressions(self):
expr1 = col('a')
expr2 = col('b')
diff --git a/flink-python/pyflink/table/tests/test_expression_completeness.py
b/flink-python/pyflink/table/tests/test_expression_completeness.py
index 2d45dff..412aa17 100644
--- a/flink-python/pyflink/table/tests/test_expression_completeness.py
+++ b/flink-python/pyflink/table/tests/test_expression_completeness.py
@@ -41,10 +41,6 @@ class
ExpressionCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase
'toExpr',
'getChildren',
- # The following methods need to be implemented still
- 'jsonExists',
- 'jsonValue',
-
# The following methods have been replaced with the built-in
methods in Python,
# such as __and__ for and to be more Pythonic.
'and',