WeiZhong94 commented on a change in pull request #13278:
URL: https://github.com/apache/flink/pull/13278#discussion_r481595190



##########
File path: flink-python/pyflink/table/expressions.py
##########
@@ -0,0 +1,543 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink import add_version_doc
+from pyflink.java_gateway import get_gateway
+from pyflink.table.expression import Expression, _get_java_expression, 
TimePointUnit
+from pyflink.table.types import _to_java_data_type, DataType
+from pyflink.table.udf import UserDefinedFunctionWrapper
+from pyflink.util.utils import to_jarray
+
+
+__all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 
'UNBOUNDED_ROW',
+           'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 
'current_time',
+           'current_timestamp', 'local_time', 'local_timestamp', 
'temporal_overlaps',
+           'date_format', 'timestamp_diff', 'array', 'row', 'map_', 
'row_interval', 'pi', 'e',
+           'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 
'uuid', 'null_of',
+           'log', 'with_columns', 'without_columns', 'call']
+
+
+def _leaf_op(op_name: str):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)())
+
+
+def _unary_op(op_name: str, arg):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, 
op_name)(_get_java_expression(arg)))
+
+
+def _binary_op(op_name: str, first, second):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second)))
+
+
+def _ternary_op(op_name: str, first, second, third):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third)))
+
+
+def _quaternion_op(op_name: str, first, second, third, forth):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third),
+        _get_java_expression(forth)))
+
+
+def _add_version_doc():
+    from inspect import getmembers, isfunction
+    from pyflink.table import expressions
+    for o in getmembers(expressions):
+        if isfunction(o[1]) and not o[0].startswith('_'):
+            add_version_doc(o[1], "1.12.0")
+
+
+def col(name: str):
+    """
+    Creates an expression which refers to a table's field.
+
+    Example:
+    ::
+
+        >>> tab.select(col("key"), col("value"))
+
+    :param name: the field name to refer to
+    """
+    return _unary_op("$", name)
+
+
+def lit(v, data_type: DataType = None):
+    """
+    Creates a SQL literal.
+
+    The data type is derived from the object's class and its value. For 
example, `lit(12)` leads
+    to `INT`, `lit("abc")` leads to `CHAR(3)`.
+
+    Example:
+    ::
+
+        >>> tab.select(col("key"), lit("abc"))
+    """
+    if data_type is None:
+        return _unary_op("lit", v)
+    else:
+        return _binary_op("lit", v, _to_java_data_type(data_type))
+
+
+def range_(start: Union[str, int], end: Union[str, int]):
+    """
+    Indicates a range from 'start' to 'end', which can be used in columns 
selection.
+
+    Example:
+    ::
+
+        >>> tab.select(with_columns(range_('b', 'c')))
+
+    .. seealso:: :func:`~pyflink.table.expressions.with_columns`
+    """
+    return _binary_op("range", start, end)
+
+
+def and_(predicate0: Union[bool, 'Expression[bool]'],
+         predicate1: Union[bool, 'Expression[bool]'],
+         *predicates: Union[bool, 'Expression[bool]']) -> 'Expression[bool]':
+    """
+    Boolean AND in three-valued logic.
+    """
+    gateway = get_gateway()
+    predicates = to_jarray(gateway.jvm.Object, [_get_java_expression(p) for p 
in predicates])
+    return _ternary_op("and", predicate0, predicate1, predicates)
+
+
+def or_(predicate0: Union[bool, 'Expression[bool]'],
+        predicate1: Union[bool, 'Expression[bool]'],
+        *predicates: Union[bool, 'Expression[bool]']) -> 'Expression[bool]':
+    """
+    Boolean OR in three-valued logic.
+    """
+    gateway = get_gateway()
+    predicates = to_jarray(gateway.jvm.Object, [_get_java_expression(p) for p 
in predicates])
+    return _ternary_op("or", predicate0, predicate1, predicates)
+
+
+"""
+Offset constant to be used in the `preceding` clause of unbounded
+:class:`~pyflink.table.window.Over`. Use this constant for a time interval.
+Unbounded over windows start with the first row of a partition.
+
+.. versionadded:: 1.12.0
+"""
+UNBOUNDED_ROW = Expression("UNBOUNDED_ROW")
+
+
+"""
+Offset constant to be used in the `preceding` clause of unbounded
+:class:`~pyflink.table.window.Over` windows. Use this constant for a row-count 
interval.
+Unbounded over windows start with the first row of a partition.
+
+.. versionadded:: 1.12.0
+"""
+UNBOUNDED_RANGE = Expression("UNBOUNDED_RANGE")
+
+
+"""
+Offset constant to be used in the `following` clause of 
:class:`~pyflink.table.window.Over` windows.
+Use this for setting the upper bound of the window to the current row.
+
+.. versionadded:: 1.12.0
+"""
+CURRENT_ROW = Expression("CURRENT_ROW")
+
+
+"""
+Offset constant to be used in the `following` clause of 
:class:`~pyflink.table.window.Over` windows.
+Use this for setting the upper bound of the window to the sort key of the 
current row, i.e.,
+all rows with the same sort key as the current row are included in the window.
+
+.. versionadded:: 1.12.0
+"""
+CURRENT_RANGE = Expression("CURRENT_RANGE")
+
+
+def current_date():
+    """
+    Returns the current SQL date in UTC time zone.
+    """
+    return _leaf_op("currentDate")
+
+
+def current_time():
+    """
+    Returns the current SQL time in UTC time zone.
+    """
+    return _leaf_op("currentTime")
+
+
+def current_timestamp():
+    """
+    Returns the current SQL timestamp in UTC time zone.
+    """
+    return _leaf_op("currentTimestamp")
+
+
+def local_time():
+    """
+    Returns the current SQL time in local time zone.
+    """
+    return _leaf_op("localTime")
+
+
+def local_timestamp():
+    """
+    Returns the current SQL timestamp in local time zone.
+    """
+    return _leaf_op("localTimestamp")
+
+
+def temporal_overlaps(left_time_point, left_temporal, right_time_point, 
right_temporal):
+    """
+    Determines whether two anchored time intervals overlap. Time point and 
temporal are
+    transformed into a range defined by two time points (start, end). The 
function
+    evaluates `left_end >= right_start && right_end >= left_start`.
+
+    e.g.
+        temporal_overlaps(
+            lit("2:55:00").to_time(),
+            lit(1).hours(),
+            lit("3:30:00").to_time(),
+            lit(2).hours()) leads to true.
+
+    :param left_time_point: The left time point
+    :param left_temporal: The time interval from the left time point
+    :param right_time_point: The right time point
+    :param right_temporal: The time interval from the right time point
+    :return: An expression which indicates whether two anchored time intervals 
overlap.
+    """
+    return _quaternion_op("temporalOverlaps",
+                          left_time_point, left_temporal, right_time_point, 
right_temporal)
+
+
+def date_format(timestamp, format):
+    """
+    Formats a timestamp as a string using a specified format.
+    The format must be compatible with MySQL's date formatting syntax as used 
by the
+    date_parse function.
+
+    For example `date_format(col("time"), "%Y, %d %M")` results in strings 
formatted as
+    "2017, 05 May".
+
+    :param timestamp: The timestamp to format as string.
+    :param format: The format of the string.
+    :return: The formatted timestamp as string.
+    """
+    return _binary_op("dateFormat", timestamp, format)
+
+
+def timestamp_diff(time_point_unit: TimePointUnit, time_point1, time_point2):
+    """
+    Returns the (signed) number of 
:class:`~pyflink.table.expression.TimePointUnit` between
+    time_point1 and time_point2.
+
+    For example,
+    `timestamp_diff(TimePointUnit.DAY, lit("2016-06-15").to_date(), 
lit("2016-06-18").to_date()`
+    leads to 3.
+
+    :param time_point_unit: The unit to compute diff.
+    :param time_point1: The first point in time.
+    :param time_point2: The second point in time.
+    :return: The number of intervals as integer value.
+    """
+    return _ternary_op("timestampDiff", 
TimePointUnit._to_j_time_point_unit(time_point_unit),
+                       time_point1, time_point2)
+
+
+def array(head, *tail):
+    """
+    Creates an array of literals.
+
+    Example:
+    ::
+
+        >>> tab.select(array(1, 2, 3))
+    """
+    gateway = get_gateway()
+    tail = to_jarray(gateway.jvm.Object, [_get_java_expression(t) for t in 
tail])
+    return _binary_op("array", head, tail)
+
+
+def row(head, *tail):
+    """
+    Creates a row of expressions.
+
+    Example:
+    ::
+
+        >>> tab.select(row("key1", 1))
+    """
+    gateway = get_gateway()
+    tail = to_jarray(gateway.jvm.Object, [_get_java_expression(t) for t in 
tail])
+    return _binary_op("row", head, tail)
+
+
+def map_(key, value, *tail):
+    """
+    Creates a map of expressions.
+
+    Example:
+    ::
+
+        >>> tab.select(
+        >>>     map_(
+        >>>         "key1", 1,
+        >>>         "key2", 2,
+        >>>         "key3", 3
+        >>>     ))
+
+    .. note::
+
+        keys and values should have the same types for all entries.
+    """
+    gateway = get_gateway()
+    tail = to_jarray(gateway.jvm.Object, [_get_java_expression(t) for t in 
tail])
+    return _ternary_op("map", key, value, tail)
+
+
+def row_interval(rows: int):
+    """
+    Creates an interval of rows.
+
+    Example:
+    ::
+
+        >>> tab.window(Over
+        >>>         .partition_by(col('a'))
+        >>>         .order_by(col('proctime'))
+        >>>         .preceding(row_interval(4))
+        >>>         .following(CURRENT_ROW)
+        >>>         .alias('w'))
+
+    :param rows: the number of rows
+    """
+    return _unary_op("rowInterval", rows)
+
+
+def pi() -> 'Expression[float]':
+    """
+    Returns a value that is closer than any other value to `pi`.
+    """
+    return _leaf_op("pi")
+
+
+def e() -> 'Expression[float]':
+    """
+    Returns a value that is closer than any other value to `e`.
+    """
+    return _leaf_op("e")
+
+
+def rand(seed: 'Union[int, Expression[int]]' = None) -> 'Expression[float]':
+    """
+    Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 
(exclusive) with a
+    initial seed if specified. Two rand() functions will return identical 
sequences of numbers if
+    they have same initial seed.
+    """
+    if seed is None:
+        return _leaf_op("rand")
+    else:
+        return _unary_op("rand", seed)
+
+
+def rand_integer(bound: 'Union[int, Expression[int]]',
+                 seed: 'Union[int, Expression[int]]' = None):
+    """
+    Returns a pseudorandom integer value between 0.0 (inclusive) and the 
specified value
+    (exclusive) with a initial seed if specified. Two rand_integer() functions 
will return
+    identical sequences of numbers if they have same initial seed and same 
bound.
+    """
+    if seed is None:
+        return _unary_op("randInteger", bound)
+    else:
+        return _binary_op("randInteger", seed, bound)
+
+
+def atan2(y, x) -> 'Expression[float]':
+    """
+    Calculates the arc tangent of a given coordinate.
+    """
+    return _binary_op("atan2", y, x)
+
+
+def negative(v):
+    """
+    Returns negative numeric.
+    """
+    return _unary_op("negative", v)
+
+
+def concat(first: 'Union[str, Expression[str]]',
+           *others: 'Union[str, Expression[str]]') -> 'Expression[str]':
+    """
+    Returns the string that results from concatenating the arguments.
+    Returns NULL if any argument is NULL.
+    """
+    gateway = get_gateway()
+    return _binary_op("concat",
+                      first,
+                      to_jarray(gateway.jvm.Object,
+                                [_get_java_expression(other) for other in 
others]))
+
+
+def concat_ws(separator: 'Union[str, Expression[str]]',
+              first: 'Union[str, Expression[str]]',
+              *others: 'Union[str, Expression[str]]'):
+    """
+    Returns the string that results from concatenating the arguments and 
separator.
+    Returns NULL If the separator is NULL.
+
+    .. note::
+
+        this function does not skip empty strings. However, it does skip any 
NULL
+        values after the separator argument.
+    """
+    gateway = get_gateway()
+    return _ternary_op("concatWs",
+                       separator,
+                       first,
+                       to_jarray(gateway.jvm.Object,
+                                 [_get_java_expression(other) for other in 
others]))
+
+
+def uuid() -> Union[str, 'Expression[str]']:

Review comment:
       This function seems always returns a 'Expression[str]'

##########
File path: flink-python/pyflink/table/expressions.py
##########
@@ -0,0 +1,543 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink import add_version_doc
+from pyflink.java_gateway import get_gateway
+from pyflink.table.expression import Expression, _get_java_expression, 
TimePointUnit
+from pyflink.table.types import _to_java_data_type, DataType
+from pyflink.table.udf import UserDefinedFunctionWrapper
+from pyflink.util.utils import to_jarray
+
+
+__all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 
'UNBOUNDED_ROW',
+           'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 
'current_time',
+           'current_timestamp', 'local_time', 'local_timestamp', 
'temporal_overlaps',
+           'date_format', 'timestamp_diff', 'array', 'row', 'map_', 
'row_interval', 'pi', 'e',
+           'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 
'uuid', 'null_of',
+           'log', 'with_columns', 'without_columns', 'call']
+
+
+def _leaf_op(op_name: str):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)())
+
+
+def _unary_op(op_name: str, arg):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, 
op_name)(_get_java_expression(arg)))
+
+
+def _binary_op(op_name: str, first, second):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second)))
+
+
+def _ternary_op(op_name: str, first, second, third):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third)))
+
+
+def _quaternion_op(op_name: str, first, second, third, forth):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third),
+        _get_java_expression(forth)))
+
+
+def _add_version_doc():
+    from inspect import getmembers, isfunction
+    from pyflink.table import expressions
+    for o in getmembers(expressions):
+        if isfunction(o[1]) and not o[0].startswith('_'):
+            add_version_doc(o[1], "1.12.0")
+
+
+def col(name: str):
+    """
+    Creates an expression which refers to a table's field.
+
+    Example:
+    ::
+
+        >>> tab.select(col("key"), col("value"))
+
+    :param name: the field name to refer to
+    """
+    return _unary_op("$", name)
+
+
+def lit(v, data_type: DataType = None):
+    """
+    Creates a SQL literal.
+
+    The data type is derived from the object's class and its value. For 
example, `lit(12)` leads
+    to `INT`, `lit("abc")` leads to `CHAR(3)`.
+
+    Example:
+    ::
+
+        >>> tab.select(col("key"), lit("abc"))
+    """
+    if data_type is None:
+        return _unary_op("lit", v)
+    else:
+        return _binary_op("lit", v, _to_java_data_type(data_type))
+
+
+def range_(start: Union[str, int], end: Union[str, int]):
+    """
+    Indicates a range from 'start' to 'end', which can be used in columns 
selection.
+
+    Example:
+    ::
+
+        >>> tab.select(with_columns(range_('b', 'c')))
+
+    .. seealso:: :func:`~pyflink.table.expressions.with_columns`
+    """
+    return _binary_op("range", start, end)
+
+
+def and_(predicate0: Union[bool, 'Expression[bool]'],
+         predicate1: Union[bool, 'Expression[bool]'],
+         *predicates: Union[bool, 'Expression[bool]']) -> 'Expression[bool]':
+    """
+    Boolean AND in three-valued logic.
+    """
+    gateway = get_gateway()
+    predicates = to_jarray(gateway.jvm.Object, [_get_java_expression(p) for p 
in predicates])
+    return _ternary_op("and", predicate0, predicate1, predicates)
+
+
+def or_(predicate0: Union[bool, 'Expression[bool]'],
+        predicate1: Union[bool, 'Expression[bool]'],
+        *predicates: Union[bool, 'Expression[bool]']) -> 'Expression[bool]':
+    """
+    Boolean OR in three-valued logic.
+    """
+    gateway = get_gateway()
+    predicates = to_jarray(gateway.jvm.Object, [_get_java_expression(p) for p 
in predicates])
+    return _ternary_op("or", predicate0, predicate1, predicates)
+
+
+"""
+Offset constant to be used in the `preceding` clause of unbounded
+:class:`~pyflink.table.window.Over`. Use this constant for a time interval.
+Unbounded over windows start with the first row of a partition.
+
+.. versionadded:: 1.12.0
+"""
+UNBOUNDED_ROW = Expression("UNBOUNDED_ROW")
+
+
+"""
+Offset constant to be used in the `preceding` clause of unbounded
+:class:`~pyflink.table.window.Over` windows. Use this constant for a row-count 
interval.
+Unbounded over windows start with the first row of a partition.
+
+.. versionadded:: 1.12.0
+"""
+UNBOUNDED_RANGE = Expression("UNBOUNDED_RANGE")
+
+
+"""
+Offset constant to be used in the `following` clause of 
:class:`~pyflink.table.window.Over` windows.
+Use this for setting the upper bound of the window to the current row.
+
+.. versionadded:: 1.12.0
+"""
+CURRENT_ROW = Expression("CURRENT_ROW")
+
+
+"""
+Offset constant to be used in the `following` clause of 
:class:`~pyflink.table.window.Over` windows.
+Use this for setting the upper bound of the window to the sort key of the 
current row, i.e.,
+all rows with the same sort key as the current row are included in the window.
+
+.. versionadded:: 1.12.0
+"""
+CURRENT_RANGE = Expression("CURRENT_RANGE")
+
+
+def current_date():
+    """
+    Returns the current SQL date in UTC time zone.
+    """
+    return _leaf_op("currentDate")
+
+
+def current_time():
+    """
+    Returns the current SQL time in UTC time zone.
+    """
+    return _leaf_op("currentTime")
+
+
+def current_timestamp():
+    """
+    Returns the current SQL timestamp in UTC time zone.
+    """
+    return _leaf_op("currentTimestamp")
+
+
+def local_time():
+    """
+    Returns the current SQL time in local time zone.
+    """
+    return _leaf_op("localTime")
+
+
+def local_timestamp():
+    """
+    Returns the current SQL timestamp in local time zone.
+    """
+    return _leaf_op("localTimestamp")
+
+
+def temporal_overlaps(left_time_point, left_temporal, right_time_point, 
right_temporal):
+    """
+    Determines whether two anchored time intervals overlap. Time point and 
temporal are
+    transformed into a range defined by two time points (start, end). The 
function
+    evaluates `left_end >= right_start && right_end >= left_start`.
+
+    e.g.
+        temporal_overlaps(
+            lit("2:55:00").to_time(),
+            lit(1).hours(),
+            lit("3:30:00").to_time(),
+            lit(2).hours()) leads to true.
+
+    :param left_time_point: The left time point
+    :param left_temporal: The time interval from the left time point
+    :param right_time_point: The right time point
+    :param right_temporal: The time interval from the right time point
+    :return: An expression which indicates whether two anchored time intervals 
overlap.
+    """
+    return _quaternion_op("temporalOverlaps",
+                          left_time_point, left_temporal, right_time_point, 
right_temporal)
+
+
+def date_format(timestamp, format):
+    """
+    Formats a timestamp as a string using a specified format.
+    The format must be compatible with MySQL's date formatting syntax as used 
by the
+    date_parse function.
+
+    For example `date_format(col("time"), "%Y, %d %M")` results in strings 
formatted as
+    "2017, 05 May".
+
+    :param timestamp: The timestamp to format as string.
+    :param format: The format of the string.
+    :return: The formatted timestamp as string.
+    """
+    return _binary_op("dateFormat", timestamp, format)
+
+
+def timestamp_diff(time_point_unit: TimePointUnit, time_point1, time_point2):
+    """
+    Returns the (signed) number of 
:class:`~pyflink.table.expression.TimePointUnit` between
+    time_point1 and time_point2.
+
+    For example,
+    `timestamp_diff(TimePointUnit.DAY, lit("2016-06-15").to_date(), 
lit("2016-06-18").to_date()`
+    leads to 3.
+
+    :param time_point_unit: The unit to compute diff.
+    :param time_point1: The first point in time.
+    :param time_point2: The second point in time.
+    :return: The number of intervals as integer value.
+    """
+    return _ternary_op("timestampDiff", 
TimePointUnit._to_j_time_point_unit(time_point_unit),
+                       time_point1, time_point2)
+
+
+def array(head, *tail):
+    """
+    Creates an array of literals.
+
+    Example:
+    ::
+
+        >>> tab.select(array(1, 2, 3))
+    """
+    gateway = get_gateway()
+    tail = to_jarray(gateway.jvm.Object, [_get_java_expression(t) for t in 
tail])
+    return _binary_op("array", head, tail)
+
+
+def row(head, *tail):
+    """
+    Creates a row of expressions.
+
+    Example:
+    ::
+
+        >>> tab.select(row("key1", 1))
+    """
+    gateway = get_gateway()
+    tail = to_jarray(gateway.jvm.Object, [_get_java_expression(t) for t in 
tail])
+    return _binary_op("row", head, tail)
+
+
+def map_(key, value, *tail):
+    """
+    Creates a map of expressions.
+
+    Example:
+    ::
+
+        >>> tab.select(
+        >>>     map_(
+        >>>         "key1", 1,
+        >>>         "key2", 2,
+        >>>         "key3", 3
+        >>>     ))
+
+    .. note::
+
+        keys and values should have the same types for all entries.
+    """
+    gateway = get_gateway()
+    tail = to_jarray(gateway.jvm.Object, [_get_java_expression(t) for t in 
tail])
+    return _ternary_op("map", key, value, tail)
+
+
+def row_interval(rows: int):
+    """
+    Creates an interval of rows.
+
+    Example:
+    ::
+
+        >>> tab.window(Over
+        >>>         .partition_by(col('a'))
+        >>>         .order_by(col('proctime'))
+        >>>         .preceding(row_interval(4))
+        >>>         .following(CURRENT_ROW)
+        >>>         .alias('w'))
+
+    :param rows: the number of rows
+    """
+    return _unary_op("rowInterval", rows)
+
+
+def pi() -> 'Expression[float]':
+    """
+    Returns a value that is closer than any other value to `pi`.
+    """
+    return _leaf_op("pi")
+
+
+def e() -> 'Expression[float]':
+    """
+    Returns a value that is closer than any other value to `e`.
+    """
+    return _leaf_op("e")
+
+
+def rand(seed: 'Union[int, Expression[int]]' = None) -> 'Expression[float]':
+    """
+    Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 
(exclusive) with a
+    initial seed if specified. Two rand() functions will return identical 
sequences of numbers if
+    they have same initial seed.
+    """
+    if seed is None:
+        return _leaf_op("rand")
+    else:
+        return _unary_op("rand", seed)
+
+
+def rand_integer(bound: 'Union[int, Expression[int]]',
+                 seed: 'Union[int, Expression[int]]' = None):
+    """
+    Returns a pseudorandom integer value between 0.0 (inclusive) and the 
specified value
+    (exclusive) with a initial seed if specified. Two rand_integer() functions 
will return
+    identical sequences of numbers if they have same initial seed and same 
bound.
+    """
+    if seed is None:
+        return _unary_op("randInteger", bound)
+    else:
+        return _binary_op("randInteger", seed, bound)
+
+
+def atan2(y, x) -> 'Expression[float]':
+    """
+    Calculates the arc tangent of a given coordinate.
+    """
+    return _binary_op("atan2", y, x)
+
+
+def negative(v):
+    """
+    Returns negative numeric.
+    """
+    return _unary_op("negative", v)
+
+
+def concat(first: 'Union[str, Expression[str]]',
+           *others: 'Union[str, Expression[str]]') -> 'Expression[str]':
+    """
+    Returns the string that results from concatenating the arguments.
+    Returns NULL if any argument is NULL.
+    """
+    gateway = get_gateway()
+    return _binary_op("concat",
+                      first,
+                      to_jarray(gateway.jvm.Object,
+                                [_get_java_expression(other) for other in 
others]))
+
+
+def concat_ws(separator: 'Union[str, Expression[str]]',
+              first: 'Union[str, Expression[str]]',
+              *others: 'Union[str, Expression[str]]'):

Review comment:
       This function seems always returns a 'Expression[str]'

##########
File path: flink-python/pyflink/table/expressions.py
##########
@@ -0,0 +1,543 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink import add_version_doc
+from pyflink.java_gateway import get_gateway
+from pyflink.table.expression import Expression, _get_java_expression, 
TimePointUnit
+from pyflink.table.types import _to_java_data_type, DataType
+from pyflink.table.udf import UserDefinedFunctionWrapper
+from pyflink.util.utils import to_jarray
+
+
+__all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 
'UNBOUNDED_ROW',
+           'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 
'current_time',
+           'current_timestamp', 'local_time', 'local_timestamp', 
'temporal_overlaps',
+           'date_format', 'timestamp_diff', 'array', 'row', 'map_', 
'row_interval', 'pi', 'e',
+           'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 
'uuid', 'null_of',
+           'log', 'with_columns', 'without_columns', 'call']
+
+
+def _leaf_op(op_name: str):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)())
+
+
+def _unary_op(op_name: str, arg):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, 
op_name)(_get_java_expression(arg)))
+
+
+def _binary_op(op_name: str, first, second):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second)))
+
+
+def _ternary_op(op_name: str, first, second, third):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third)))
+
+
+def _quaternion_op(op_name: str, first, second, third, forth):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third),
+        _get_java_expression(forth)))
+
+
+def _add_version_doc():
+    from inspect import getmembers, isfunction
+    from pyflink.table import expressions
+    for o in getmembers(expressions):
+        if isfunction(o[1]) and not o[0].startswith('_'):
+            add_version_doc(o[1], "1.12.0")
+
+
+def col(name: str):

Review comment:
       We should add the return type hint for all the interfaces to fully 
utilize the IDE’s type checking.

##########
File path: flink-python/pyflink/table/expressions.py
##########
@@ -0,0 +1,543 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink import add_version_doc
+from pyflink.java_gateway import get_gateway
+from pyflink.table.expression import Expression, _get_java_expression, 
TimePointUnit
+from pyflink.table.types import _to_java_data_type, DataType
+from pyflink.table.udf import UserDefinedFunctionWrapper
+from pyflink.util.utils import to_jarray
+
+
+__all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 
'UNBOUNDED_ROW',
+           'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 
'current_time',
+           'current_timestamp', 'local_time', 'local_timestamp', 
'temporal_overlaps',
+           'date_format', 'timestamp_diff', 'array', 'row', 'map_', 
'row_interval', 'pi', 'e',
+           'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 
'uuid', 'null_of',
+           'log', 'with_columns', 'without_columns', 'call']
+
+
+def _leaf_op(op_name: str):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)())
+
+
+def _unary_op(op_name: str, arg):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, 
op_name)(_get_java_expression(arg)))
+
+
+def _binary_op(op_name: str, first, second):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second)))
+
+
+def _ternary_op(op_name: str, first, second, third):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third)))
+
+
+def _quaternion_op(op_name: str, first, second, third, forth):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third),
+        _get_java_expression(forth)))
+
+
+def _add_version_doc():
+    from inspect import getmembers, isfunction
+    from pyflink.table import expressions
+    for o in getmembers(expressions):
+        if isfunction(o[1]) and not o[0].startswith('_'):
+            add_version_doc(o[1], "1.12.0")
+
+
+def col(name: str):
+    """
+    Creates an expression which refers to a table's field.
+
+    Example:
+    ::
+
+        >>> tab.select(col("key"), col("value"))
+
+    :param name: the field name to refer to
+    """
+    return _unary_op("$", name)
+
+
+def lit(v, data_type: DataType = None):
+    """
+    Creates a SQL literal.
+
+    The data type is derived from the object's class and its value. For 
example, `lit(12)` leads
+    to `INT`, `lit("abc")` leads to `CHAR(3)`.
+
+    Example:
+    ::
+
+        >>> tab.select(col("key"), lit("abc"))
+    """
+    if data_type is None:
+        return _unary_op("lit", v)
+    else:
+        return _binary_op("lit", v, _to_java_data_type(data_type))
+
+
+def range_(start: Union[str, int], end: Union[str, int]):
+    """
+    Indicates a range from 'start' to 'end', which can be used in columns 
selection.
+
+    Example:
+    ::
+
+        >>> tab.select(with_columns(range_('b', 'c')))
+
+    .. seealso:: :func:`~pyflink.table.expressions.with_columns`
+    """
+    return _binary_op("range", start, end)
+
+
+def and_(predicate0: Union[bool, 'Expression[bool]'],

Review comment:
       Union[bool, 'Expression[bool]'] -> 'Union[bool, Expression[bool]]'?
   It would be better if we unify the writing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to