This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ba9b4f1  [FLINK-23757][python] Support json_exists and json_value
ba9b4f1 is described below

commit ba9b4f1039037d4a2da4f90a2f83612ccf2fa708
Author: Ingo Bürk <[email protected]>
AuthorDate: Wed Aug 18 09:15:23 2021 +0200

    [FLINK-23757][python] Support json_exists and json_value
    
    This closes #16874.
---
 flink-python/pyflink/table/expression.py           | 114 ++++++++++++++++++++-
 .../pyflink/table/tests/test_expression.py         |  15 ++-
 .../table/tests/test_expression_completeness.py    |   4 -
 3 files changed, 125 insertions(+), 8 deletions(-)

diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 3d92e10..52cc689 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -16,14 +16,20 @@
 # limitations under the License.
 
################################################################################
 from enum import Enum
-from typing import Union, TypeVar, Generic
+from typing import Union, TypeVar, Generic, Any
 
 from pyflink import add_version_doc
 from pyflink.java_gateway import get_gateway
-from pyflink.table.types import DataType, _to_java_data_type
+from pyflink.table.types import DataType, DataTypes, _to_java_data_type
 from pyflink.util.java_utils import to_jarray
 
-__all__ = ['Expression', 'TimeIntervalUnit', 'TimePointUnit']
+__all__ = [
+    'Expression',
+    'TimeIntervalUnit',
+    'TimePointUnit',
+    'JsonExistsOnError',
+    'JsonValueOnEmptyOrError'
+]
 
 
 _aggregation_doc = """
@@ -291,6 +297,14 @@ def _ternary_op(op_name: str):
     return _
 
 
+def _varargs_op(op_name: str):
+    def _(self, *args) -> 'Expression':
+        return Expression(
+            getattr(self._j_expr, op_name)(*[_get_java_expression(arg) for arg 
in args]))
+
+    return _
+
+
 def _expressions_op(op_name: str):
     def _(self, *args) -> 'Expression':
         from pyflink.table import expressions
@@ -352,6 +366,37 @@ class TimePointUnit(Enum):
         return getattr(JTimePointUnit, self.name)
 
 
+class JsonExistsOnError(Enum):
+    """
+    Behavior in case of errors for json_exists().
+    """
+
+    TRUE = 0,
+    FALSE = 1,
+    UNKNOWN = 2,
+    ERROR = 3
+
+    def _to_j_json_exists_on_error(self):
+        gateway = get_gateway()
+        JJsonExistsOnError = 
gateway.jvm.org.apache.flink.table.api.JsonExistsOnError
+        return getattr(JJsonExistsOnError, self.name)
+
+
+class JsonValueOnEmptyOrError(Enum):
+    """
+    Behavior in case of emptiness or errors for json_value().
+    """
+
+    NULL = 0,
+    ERROR = 1,
+    DEFAULT = 2
+
+    def _to_j_json_value_on_empty_or_error(self):
+        gateway = get_gateway()
+        JJsonValueOnEmptyOrError = 
gateway.jvm.org.apache.flink.table.api.JsonValueOnEmptyOrError
+        return getattr(JJsonValueOnEmptyOrError, self.name)
+
+
 T = TypeVar('T')
 
 
@@ -1354,6 +1399,69 @@ class Expression(Generic[T]):
         """
         return _binary_op("sha2")(self, hash_length)
 
+    # ---------------------------- JSON functions -----------------------------
+
+    def json_exists(self, path: str, on_error: JsonExistsOnError = None) -> 
'Expression[bool]':
+        """
+        Determines whether a JSON string satisfies a given search criterion.
+
+        This follows the ISO/IEC TR 19075-6 specification for JSON support in 
SQL.
+
+        Examples:
+        ::
+
+            >>> lit('{"a": true}').json_exists('$.a') // True
+            >>> lit('{"a": true}').json_exists('$.b') // False
+            >>> lit('{"a": [{ "b": 1 }]}').json_exists('$.a[0].b') // True
+
+            >>> lit('{"a": true}').json_exists('strict $.b', 
JsonExistsOnError.TRUE) // True
+            >>> lit('{"a": true}').json_exists('strict $.b', 
JsonExistsOnError.FALSE) // False
+        """
+        if on_error is None:
+            return _binary_op("jsonExists")(self, path)
+        else:
+            return _ternary_op("jsonExists")(self, path, 
on_error._to_j_json_exists_on_error())
+
+    def json_value(self,
+                   path: str,
+                   returning_type: DataType = DataTypes.STRING(),
+                   on_empty: JsonValueOnEmptyOrError = 
JsonValueOnEmptyOrError.NULL,
+                   default_on_empty: Any = None,
+                   on_error: JsonValueOnEmptyOrError = 
JsonValueOnEmptyOrError.NULL,
+                   default_on_error: Any = None) -> 'Expression':
+        """
+        Extracts a scalar from a JSON string.
+
+        This method searches a JSON string for a given path expression and 
returns the value if the
+        value at that path is scalar. Non-scalar values cannot be returned. By 
default, the value is
+        returned as `DataTypes.STRING()`. Using `returningType` a different 
type can be chosen, with
+        the following types being supported:
+
+        * `STRING`
+        * `BOOLEAN`
+        * `INT`
+        * `DOUBLE`
+
+        For empty path expressions or errors a behavior can be defined to 
either return `null`,
+        raise an error or return a defined default value instead.
+
+        Examples:
+        ::
+
+            >>> lit('{"a": true}').json_value('$.a')
+            >>> lit('{"a": true}').json_value('$.a', DataTypes.BOOLEAN())
+            >>> lit('{"a": true}').json_value('lax $.b', \
+                    JsonValueOnEmptyOrError.DEFAULT, False)
+            >>> lit('{"a": true}').json_value('strict $.b', \
+                    JsonValueOnEmptyOrError.NULL, None, 
JsonValueOnEmptyOrError.DEFAULT, False)
+        """
+        return _varargs_op("jsonValue")(self, path, 
_to_java_data_type(returning_type),
+                                        
on_empty._to_j_json_value_on_empty_or_error(),
+                                        default_on_empty,
+                                        
on_error._to_j_json_value_on_empty_or_error(),
+                                        default_on_error)
+
+
 # add the docs
 _make_math_log_doc()
 _make_math_trigonometric_doc()
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 1e7ef7b..7ba4621 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -18,7 +18,8 @@
 import unittest
 
 from pyflink.table import DataTypes
-from pyflink.table.expression import TimeIntervalUnit, TimePointUnit
+from pyflink.table.expression import TimeIntervalUnit, TimePointUnit, 
JsonExistsOnError, \
+    JsonValueOnEmptyOrError
 from pyflink.table.expressions import (col, lit, range_, and_, or_, 
current_date,
                                        current_time, current_timestamp, 
local_time,
                                        local_timestamp, temporal_overlaps, 
date_format,
@@ -191,6 +192,18 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual('sha512(a)', str(expr1.sha512))
         self.assertEqual('sha2(a, 224)', str(expr1.sha2(224)))
 
+        # json functions
+        self.assertEqual("JSON_EXISTS('{}', '$.x')", 
str(lit('{}').json_exists('$.x')))
+        self.assertEqual("JSON_EXISTS('{}', '$.x', FALSE)",
+                         str(lit('{}').json_exists('$.x', 
JsonExistsOnError.FALSE)))
+
+        self.assertEqual("JSON_VALUE('{}', '$.x', STRING, NULL, null, NULL, 
null)",
+                         str(lit('{}').json_value('$.x')))
+        self.assertEqual("JSON_VALUE('{}', '$.x', INT, DEFAULT, 42, ERROR, 
null)",
+                         str(lit('{}').json_value('$.x', DataTypes.INT(),
+                                                  
JsonValueOnEmptyOrError.DEFAULT, 42,
+                                                  
JsonValueOnEmptyOrError.ERROR, None)))
+
     def test_expressions(self):
         expr1 = col('a')
         expr2 = col('b')
diff --git a/flink-python/pyflink/table/tests/test_expression_completeness.py 
b/flink-python/pyflink/table/tests/test_expression_completeness.py
index 2d45dff..412aa17 100644
--- a/flink-python/pyflink/table/tests/test_expression_completeness.py
+++ b/flink-python/pyflink/table/tests/test_expression_completeness.py
@@ -41,10 +41,6 @@ class 
ExpressionCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase
             'toExpr',
             'getChildren',
 
-            # The following methods need to be implemented still
-            'jsonExists',
-            'jsonValue',
-
             # The following methods have been replaced with the built-in 
methods in Python,
             # such as __and__ for and to be more Pythonic.
             'and',

Reply via email to