WeiZhong94 commented on a change in pull request #13278:
URL: https://github.com/apache/flink/pull/13278#discussion_r480911264



##########
File path: flink-python/pyflink/table/tests/test_expression.py
##########
@@ -0,0 +1,253 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+
+from pyflink.table import DataTypes
+from pyflink.table.expression import TimeIntervalUnit, TimePointUnit
+from pyflink.table.expressions import (col, lit, range_, and_, or_, 
current_date,
+                                       current_time, current_timestamp, 
local_time,
+                                       local_timestamp, temporal_overlaps, 
date_format,
+                                       timestamp_diff, array, row, map_, 
row_interval, pi, e,
+                                       rand, rand_integer, atan2, negative, 
concat, concat_ws, uuid,
+                                       null_of, log, if_then_else, 
with_columns, call)
+from pyflink.testing.test_case_utils import PyFlinkTestCase
+
+
+class PyFlinkBlinkBatchExpressionTests(PyFlinkTestCase):
+
+    def test_expression(self):
+        expr1 = col('a')
+        expr2 = col('b')
+        expr3 = col('c')
+        expr4 = col('d')
+        expr5 = lit(10)
+
+        # comparison functions
+        self.assertEqual('equals(a, b)', str(expr1 == expr2))
+        self.assertEqual('notEquals(a, b)', str(expr1 != expr2))
+        self.assertEqual('lessThan(a, b)', str(expr1 < expr2))
+        self.assertEqual('lessThanOrEqual(a, b)', str(expr1 <= expr2))
+        self.assertEqual('greaterThan(a, b)', str(expr1 > expr2))
+        self.assertEqual('greaterThanOrEqual(a, b)', str(expr1 >= expr2))
+
+        # logic functions
+        self.assertEqual('and(a, b)', str(expr1 & expr2))
+        self.assertEqual('or(a, b)', str(expr1 | expr2))
+        self.assertEqual('isNotTrue(a)', str(expr1.is_not_true()))
+        self.assertEqual('isNotTrue(a)', str(~expr1))
+
+        # arithmetic functions
+        self.assertEqual('plus(a, b)', str(expr1 + expr2))
+        self.assertEqual('minus(a, b)', str(expr1 - expr2))
+        self.assertEqual('times(a, b)', str(expr1 * expr2))
+        self.assertEqual('divide(a, b)', str(expr1 / expr2))
+        self.assertEqual('mod(a, b)', str(expr1 % expr2))

Review comment:
       It would be better to add some tests between constants and expression.

##########
File path: flink-python/pyflink/table/expression.py
##########
@@ -0,0 +1,1332 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink import add_version_doc
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import DataType, _to_java_data_type
+from pyflink.util.utils import to_jarray
+
+__all__ = ['Expression', 'TimeIntervalUnit', 'TimePointUnit']
+
+
+_aggregation_doc = """
+{op_desc}
+
+Example:
+::
+
+    >>> tab \\
+    >>>     .group_by(col("a")) \\
+    >>>     .select(col("a"),
+    >>>             col("b").sum().alias("d"),
+    >>>             col("b").sum0().alias("e"),
+    >>>             col("b").min().alias("f"),
+    >>>             col("b").max().alias("g"),
+    >>>             col("b").count().alias("h"),
+    >>>             col("b").avg().alias("i"),
+    >>>             col("b").stddev_pop().alias("j"),
+    >>>             col("b").stddev_samp().alias("k"),
+    >>>             col("b").var_pop().alias("l"),
+    >>>             col("b").var_samp().alias("m"),
+    >>>             col("b").collect().alias("n"))
+
+.. seealso:: :func:`~Expression.sum`, :func:`~Expression.sum0`, 
:func:`~Expression.min`,
+             :func:`~Expression.max`, :func:`~Expression.count`, 
:func:`~Expression.avg`,
+             :func:`~Expression.stddev_pop`, :func:`~Expression.stddev_samp`,
+             :func:`~Expression.var_pop`, :func:`~Expression.var_samp`,
+             :func:`~Expression.collect`
+"""
+
+
+_math_log_doc = """
+{op_desc}
+
+.. seealso:: :func:`~Expression.log10`, :func:`~Expression.log2`, 
:func:`~Expression.ln`,
+             :func:`~Expression.log`
+"""
+
+
+_math_trigonometric_doc = """
+Calculates the {op_desc} of a given number.
+
+.. seealso:: :func:`~Expression.sin`, :func:`~Expression.cos`, 
:func:`~Expression.sinh`,
+             :func:`~Expression.cosh`, :func:`~Expression.tan`, 
:func:`~Expression.cot`,
+             :func:`~Expression.asin`, :func:`~Expression.acos`, 
:func:`~Expression.atan`,
+             :func:`~Expression.tanh`
+"""
+
+_string_doc_seealso = """
+.. seealso:: :func:`~Expression.trim_leading`, 
:func:`~Expression.trim_trailing`,
+             :func:`~Expression.trim`, :func:`~Expression.replace`, 
:func:`~Expression.char_length`,
+             :func:`~Expression.upper_case`, :func:`~Expression.lower_case`,
+             :func:`~Expression.init_cap`, :func:`~Expression.like`, 
:func:`~Expression.similar`,
+             :func:`~Expression.position`, :func:`~Expression.lpad`, 
:func:`~Expression.rpad`,
+             :func:`~Expression.overlay`, :func:`~Expression.regexp_replace`,
+             :func:`~Expression.regexp_extract`, :func:`~Expression.substring`,
+             :func:`~Expression.from_base64`, :func:`~Expression.to_base64`,
+             :func:`~Expression.ltrim`, :func:`~Expression.rtrim`, 
:func:`~Expression.repeat`
+"""
+
+_temporal_doc_seealso = """
+.. seealso:: :func:`~Expression.to_date`, :func:`~Expression.to_time`,
+             :func:`~Expression.to_timestamp`, :func:`~Expression.extract`,
+             :func:`~Expression.floor`, :func:`~Expression.ceil`
+"""
+
+
+_time_doc = """
+Creates an interval of the given number of {op_desc}.
+
+The produced expression is of type :func:`~DataTypes.INTERVAL`.
+
+.. seealso:: :func:`~Expression.year`, :func:`~Expression.years`, 
:func:`~Expression.quarter`,
+             :func:`~Expression.quarters`, :func:`~Expression.month`, 
:func:`~Expression.months`,
+             :func:`~Expression.week`, :func:`~Expression.weeks`, 
:func:`~Expression.day`,
+             :func:`~Expression.days`, :func:`~Expression.hour`, 
:func:`~Expression.hours`,
+             :func:`~Expression.minute`, :func:`~Expression.minutes`, 
:func:`~Expression.second`,
+             :func:`~Expression.seconds`, :func:`~Expression.milli`, 
:func:`~Expression.millis`
+"""
+
+
+_hash_doc = """
+Returns the {op_desc} hash of the string argument; null if string is null.
+
+:return: string of {bit} hexadecimal digits or null.
+
+.. seealso:: :func:`~Expression.md5`, :func:`~Expression.sha1`, 
:func:`~Expression.sha224`,
+             :func:`~Expression.sha256`, :func:`~Expression.sha384`, 
:func:`~Expression.sha512`,
+             :func:`~Expression.sha2`
+"""
+
+
+def _make_math_log_doc():
+    math_log_funcs = {
+        Expression.log10: "Calculates the base 10 logarithm of the given 
value.",
+        Expression.log2: "Calculates the base 2 logarithm of the given value.",
+        Expression.ln: "Calculates the natural logarithm of the given value.",
+        Expression.log: "Calculates the natural logarithm of the given value 
if base is not "
+                        "specified. Otherwise, calculates the logarithm of the 
given value to the "
+                        "given base.",
+    }
+
+    for func, op_desc in math_log_funcs.items():
+        func.__doc__ = _math_log_doc.format(op_desc=op_desc)
+
+
+def _make_math_trigonometric_doc():
+    math_trigonometric_funcs = {
+        Expression.cosh: "hyperbolic cosine",
+        Expression.sinh: "hyperbolic sine",
+        Expression.sin: "sine",
+        Expression.cos: "cosine",
+        Expression.tan: "tangent",
+        Expression.cot: "cotangent",
+        Expression.asin: "arc sine",
+        Expression.acos: "arc cosine",
+        Expression.atan: "arc tangent",
+        Expression.tanh: "hyperbolic tangent",
+    }
+
+    for func, op_desc in math_trigonometric_funcs.items():
+        func.__doc__ = _math_trigonometric_doc.format(op_desc=op_desc)
+
+
+def _make_aggregation_doc():
+    aggregation_funcs = {
+        Expression.sum: "Returns the sum of the numeric field across all input 
values. "
+                        "If all values are null, null is returned.",
+        Expression.sum0: "Returns the sum of the numeric field across all 
input values. "
+                        "If all values are null, 0 is returned.",
+        Expression.min: "Returns the minimum value of field across all input 
values.",
+        Expression.max: "Returns the maximum value of field across all input 
values.",
+        Expression.count: "Returns the number of input rows for which the 
field is not null.",
+        Expression.avg: "Returns the average (arithmetic mean) of the numeric 
field across all "
+                        "input values.",
+        Expression.stddev_pop: "Returns the population standard deviation of 
an expression(the "
+                               "square root of varPop()).",
+        Expression.stddev_samp: "Returns the sample standard deviation of an 
expression(the square "
+                                "root of varSamp()).",
+        Expression.var_pop: "Returns the population standard variance of an 
expression.",
+        Expression.var_samp: "Returns the sample variance of a given 
expression.",
+        Expression.collect: "Returns multiset aggregate of a given 
expression.",
+    }
+
+    for func, op_desc in aggregation_funcs.items():
+        func.__doc__ = _aggregation_doc.format(op_desc=op_desc)
+
+
+def _make_string_doc():
+    string_funcs = [
+        Expression.substring, Expression.trim_leading, 
Expression.trim_trailing, Expression.trim,
+        Expression.replace, Expression.char_length, Expression.upper_case, 
Expression.lower_case,
+        Expression.init_cap, Expression.like, Expression.similar, 
Expression.position,
+        Expression.lpad, Expression.rpad, Expression.overlay, 
Expression.regexp_replace,
+        Expression.regexp_extract, Expression.from_base64, 
Expression.to_base64,
+        Expression.ltrim, Expression.rtrim, Expression.repeat
+    ]
+
+    for func in string_funcs:
+        func.__doc__ = func.__doc__.replace('  ', '') + _string_doc_seealso
+
+
+def _make_temporal_doc():
+    temporal_funcs = [
+        Expression.to_date, Expression.to_time, Expression.to_timestamp, 
Expression.extract,
+        Expression.floor, Expression.ceil
+    ]
+
+    for func in temporal_funcs:
+        func.__doc__ = func.__doc__.replace('  ', '') + _temporal_doc_seealso
+
+
+def _make_time_doc():
+    time_funcs = {
+        Expression.year: "years",
+        Expression.years: "years",
+        Expression.quarter: "quarters",
+        Expression.quarters: "quarters",
+        Expression.month: "months",
+        Expression.months: "months",
+        Expression.week: "weeks",
+        Expression.weeks: "weeks",
+        Expression.day: "days",
+        Expression.days: "days",
+        Expression.hour: "hours",
+        Expression.hours: "hours",
+        Expression.minute: "minutes",
+        Expression.minutes: "minutes",
+        Expression.second: "seconds",
+        Expression.seconds: "seconds",
+        Expression.milli: "millis",
+        Expression.millis: "millis"
+    }
+
+    for func, op_desc in time_funcs.items():
+        func.__doc__ = _time_doc.format(op_desc=op_desc)
+
+
+def _make_hash_doc():
+    hash_funcs = {
+        Expression.md5: ("MD5", 32),
+        Expression.sha1: ("SHA-1", 40),
+        Expression.sha224: ("SHA-224", 56),
+        Expression.sha256: ("SHA-256", 64),
+        Expression.sha384: ("SHA-384", 96),
+        Expression.sha512: ("SHA-512", 128)
+    }
+
+    for func, (op_desc, bit) in hash_funcs.items():
+        func.__doc__ = _hash_doc.format(op_desc=op_desc, bit=bit)
+
+
+def _add_version_doc():
+    for func_name in dir(Expression):
+        if not func_name.startswith("_"):
+            add_version_doc(getattr(Expression, func_name), "1.12.0")
+
+
+def _get_java_expression(expr):
+    return expr._j_expr if isinstance(expr, Expression) else expr
+
+
+def _get_or_create_java_expression(expr: Union["Expression", str]):
+    if isinstance(expr, Expression):
+        return expr._j_expr
+    elif isinstance(expr, str):
+        from pyflink.table.expressions import col
+        return col(expr)._j_expr
+    else:
+        raise TypeError(
+            "Invalid argument: expected Expression or string, got 
{0}.".format(type(expr)))
+
+
+def _unary_op(op_name: str):
+    def _(self):
+        return Expression(getattr(self._j_expr, op_name)())
+
+    return _
+
+
+def _binary_op(op_name: str, reverse: bool = False):
+    def _(self, other):
+        if reverse:
+            return Expression(getattr(_get_java_expression(other), 
op_name)(self._j_expr))
+        else:
+            return Expression(getattr(self._j_expr, 
op_name)(_get_java_expression(other)))
+
+    return _
+
+
+def _ternary_op(op_name: str):
+    def _(self, first, second):
+        return Expression(getattr(self._j_expr, op_name)(
+            _get_java_expression(first), _get_java_expression(second)))
+
+    return _
+
+
+def _expressions_op(op_name: str):
+    def _(self, *args):
+        from pyflink.table import expressions
+        return getattr(expressions, op_name)(self, *[_get_java_expression(arg) 
for arg in args])
+
+    return _
+
+
+class TimeIntervalUnit(object):
+    """
+    Units for working with time intervals.
+
+    .. versionadded:: 1.12.0
+    """
+
+    YEAR = 0,
+    YEAR_TO_MONTH = 1,
+    QUARTER = 2,
+    MONTH = 3,
+    WEEK = 4,
+    DAY = 5,
+    DAY_TO_HOUR = 6,
+    DAY_TO_MINUTE = 7,
+    DAY_TO_SECOND = 8,
+    HOUR = 9,
+    SECOND = 10,
+    HOUR_TO_MINUTE = 11,
+    HOUR_TO_SECOND = 12,
+    MINUTE = 13,
+    MINUTE_TO_SECOND = 14
+
+    @staticmethod
+    def _from_j_time_interval_unit(j_time_interval_unit):
+        gateway = get_gateway()
+        JTimeIntervalUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimeIntervalUnit
+        if j_time_interval_unit == JTimeIntervalUnit.YEAR:
+            return TimeIntervalUnit.YEAR
+        elif j_time_interval_unit == JTimeIntervalUnit.YEAR_TO_MONTH:
+            return TimeIntervalUnit.YEAR_TO_MONTH
+        elif j_time_interval_unit == JTimeIntervalUnit.QUARTER:
+            return TimeIntervalUnit.QUARTER
+        elif j_time_interval_unit == JTimeIntervalUnit.MONTH:
+            return TimeIntervalUnit.MONTH
+        elif j_time_interval_unit == JTimeIntervalUnit.WEEK:
+            return TimeIntervalUnit.WEEK
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY:
+            return TimeIntervalUnit.DAY
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY_TO_HOUR:
+            return TimeIntervalUnit.DAY_TO_HOUR
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY_TO_MINUTE:
+            return TimeIntervalUnit.DAY_TO_MINUTE
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY_TO_SECOND:
+            return TimeIntervalUnit.DAY_TO_SECOND
+        elif j_time_interval_unit == JTimeIntervalUnit.HOUR:
+            return TimeIntervalUnit.HOUR
+        elif j_time_interval_unit == JTimeIntervalUnit.SECOND:
+            return TimeIntervalUnit.SECOND
+        elif j_time_interval_unit == JTimeIntervalUnit.HOUR_TO_MINUTE:
+            return TimeIntervalUnit.HOUR_TO_MINUTE
+        elif j_time_interval_unit == JTimeIntervalUnit.HOUR_TO_SECOND:
+            return TimeIntervalUnit.HOUR_TO_SECOND
+        elif j_time_interval_unit == JTimeIntervalUnit.MINUTE:
+            return TimeIntervalUnit.MINUTE
+        elif j_time_interval_unit == JTimeIntervalUnit.MINUTE_TO_SECOND:
+            return TimeIntervalUnit.MINUTE_TO_SECOND
+        else:
+            raise Exception("Unsupported Java time interval unit: %s." % 
j_time_interval_unit)
+
+    @staticmethod
+    def _to_j_time_interval_unit(time_interval_unit):
+        gateway = get_gateway()
+        JTimeIntervalUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimeIntervalUnit
+        if time_interval_unit == TimeIntervalUnit.YEAR:
+            j_time_interval_unit = JTimeIntervalUnit.YEAR
+        elif time_interval_unit == TimeIntervalUnit.YEAR_TO_MONTH:
+            j_time_interval_unit = JTimeIntervalUnit.YEAR_TO_MONTH
+        elif time_interval_unit == TimeIntervalUnit.QUARTER:
+            j_time_interval_unit = JTimeIntervalUnit.QUARTER
+        elif time_interval_unit == TimeIntervalUnit.MONTH:
+            j_time_interval_unit = JTimeIntervalUnit.MONTH
+        elif time_interval_unit == TimeIntervalUnit.WEEK:
+            j_time_interval_unit = JTimeIntervalUnit.WEEK
+        elif time_interval_unit == TimeIntervalUnit.DAY:
+            j_time_interval_unit = JTimeIntervalUnit.DAY
+        elif time_interval_unit == TimeIntervalUnit.DAY_TO_HOUR:
+            j_time_interval_unit = JTimeIntervalUnit.DAY_TO_HOUR
+        elif time_interval_unit == TimeIntervalUnit.DAY_TO_MINUTE:
+            j_time_interval_unit = JTimeIntervalUnit.DAY_TO_MINUTE
+        elif time_interval_unit == TimeIntervalUnit.DAY_TO_SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.DAY_TO_SECOND
+        elif time_interval_unit == TimeIntervalUnit.HOUR:
+            j_time_interval_unit = JTimeIntervalUnit.HOUR
+        elif time_interval_unit == TimeIntervalUnit.SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.SECOND
+        elif time_interval_unit == TimeIntervalUnit.HOUR_TO_MINUTE:
+            j_time_interval_unit = JTimeIntervalUnit.HOUR_TO_MINUTE
+        elif time_interval_unit == TimeIntervalUnit.HOUR_TO_SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.HOUR_TO_SECOND
+        elif time_interval_unit == TimeIntervalUnit.MINUTE:
+            j_time_interval_unit = JTimeIntervalUnit.MINUTE
+        elif time_interval_unit == TimeIntervalUnit.MINUTE_TO_SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.MINUTE_TO_SECOND
+        else:
+            raise TypeError("Unsupported time interval unit: %s, supported 
time interval unit "
+                            "are: YEAR, YEAR_TO_MONTH, QUARTER, MONTH, WEEK, 
DAY, DAY_TO_HOUR, "
+                            "DAY_TO_MINUTE, DAY_TO_SECOND, HOUR, SECOND, 
HOUR_TO_MINUTE, "
+                            "HOUR_TO_SECOND, MINUTE, MINUTE_TO_SECOND" % 
time_interval_unit)
+        return j_time_interval_unit
+
+
+class TimePointUnit(object):
+    """
+    Units for working with points in time.
+
+    .. versionadded:: 1.12.0
+    """
+
+    YEAR = 0,
+    MONTH = 1,
+    DAY = 2,
+    HOUR = 3,
+    MINUTE = 4,
+    SECOND = 5,
+    QUARTER = 6,
+    WEEK = 7,
+    MILLISECOND = 8,
+    MICROSECOND = 9
+
+    @staticmethod
+    def _from_j_time_point_unit(j_time_point_unit):
+        gateway = get_gateway()
+        JTimePointUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimePointUnit
+        if j_time_point_unit == JTimePointUnit.YEAR:
+            return TimePointUnit.YEAR
+        elif j_time_point_unit == JTimePointUnit.MONTH:
+            return TimePointUnit.MONTH
+        elif j_time_point_unit == JTimePointUnit.DAY:
+            return TimePointUnit.DAY
+        elif j_time_point_unit == JTimePointUnit.HOUR:
+            return TimePointUnit.HOUR
+        elif j_time_point_unit == JTimePointUnit.MINUTE:
+            return TimePointUnit.MINUTE
+        elif j_time_point_unit == JTimePointUnit.SECOND:
+            return TimePointUnit.SECOND
+        elif j_time_point_unit == JTimePointUnit.QUARTER:
+            return TimePointUnit.QUARTER
+        elif j_time_point_unit == JTimePointUnit.WEEK:
+            return TimePointUnit.WEEK
+        elif j_time_point_unit == JTimePointUnit.MILLISECOND:
+            return TimePointUnit.MILLISECOND
+        elif j_time_point_unit == JTimePointUnit.MICROSECOND:
+            return TimePointUnit.MICROSECOND
+        else:
+            raise Exception("Unsupported Java time point unit: %s." % 
j_time_point_unit)
+
+    @staticmethod
+    def _to_j_time_point_unit(time_point_unit):
+        gateway = get_gateway()
+        JTimePointUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimePointUnit
+        if time_point_unit == TimePointUnit.YEAR:
+            j_time_point_unit = JTimePointUnit.YEAR
+        elif time_point_unit == TimePointUnit.MONTH:
+            j_time_point_unit = JTimePointUnit.MONTH
+        elif time_point_unit == TimePointUnit.DAY:
+            j_time_point_unit = JTimePointUnit.DAY
+        elif time_point_unit == TimePointUnit.HOUR:
+            j_time_point_unit = JTimePointUnit.HOUR
+        elif time_point_unit == TimePointUnit.MINUTE:
+            j_time_point_unit = JTimePointUnit.MINUTE
+        elif time_point_unit == TimePointUnit.SECOND:
+            j_time_point_unit = JTimePointUnit.SECOND
+        elif time_point_unit == TimePointUnit.QUARTER:
+            j_time_point_unit = JTimePointUnit.QUARTER
+        elif time_point_unit == TimePointUnit.WEEK:
+            j_time_point_unit = JTimePointUnit.WEEK
+        elif time_point_unit == TimePointUnit.MILLISECOND:
+            j_time_point_unit = JTimePointUnit.MILLISECOND
+        elif time_point_unit == TimePointUnit.MICROSECOND:
+            j_time_point_unit = JTimePointUnit.MICROSECOND
+        else:
+            raise TypeError("Unsupported time point unit: %s, supported time 
point unit are: "
+                            "YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, QUARTER, "
+                            "WEEK, MILLISECOND, MICROSECOND" % time_point_unit)
+        return j_time_point_unit
+
+
+class Expression(object):
+    """
+    Expressions represent a logical tree for producing a computation result.
+    Expressions might be literal values, function calls, or field references.
+
+    .. versionadded:: 1.12.0
+    """
+
+    def __init__(self, j_expr_or_property_name):
+        self._j_expr_or_property_name = j_expr_or_property_name
+
+    __abs__ = _unary_op("abs")
+
+    # comparison functions
+    __eq__ = _binary_op("isEqual")
+    __ne__ = _binary_op("isNotEqual")
+    __lt__ = _binary_op("isLess")
+    __gt__ = _binary_op("isGreater")
+    __le__ = _binary_op("isLessOrEqual")
+    __ge__ = _binary_op("isGreaterOrEqual")
+
+    # logic functions
+    __and__ = _binary_op("and")
+    __or__ = _binary_op("or")
+    __invert__ = _unary_op('isNotTrue')
+
+    __rand__ = _binary_op("and")
+    __ror__ = _binary_op("or")
+
+    # arithmetic functions
+    __add__ = _binary_op("plus")
+    __sub__ = _binary_op("minus")
+    __mul__ = _binary_op("times")
+    __truediv__ = _binary_op("dividedBy")
+    __mod__ = _binary_op("mod")
+    __pow__ = _binary_op("power")
+    __neg__ = _expressions_op("negative")
+
+    __radd__ = _binary_op("plus")

Review comment:
       The plus operation between time variable and time interval may not obey 
the commutative law.

##########
File path: flink-python/pyflink/table/expression.py
##########
@@ -0,0 +1,1332 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink import add_version_doc
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import DataType, _to_java_data_type
+from pyflink.util.utils import to_jarray
+
+__all__ = ['Expression', 'TimeIntervalUnit', 'TimePointUnit']
+
+
+_aggregation_doc = """
+{op_desc}
+
+Example:
+::
+
+    >>> tab \\
+    >>>     .group_by(col("a")) \\
+    >>>     .select(col("a"),
+    >>>             col("b").sum().alias("d"),
+    >>>             col("b").sum0().alias("e"),
+    >>>             col("b").min().alias("f"),
+    >>>             col("b").max().alias("g"),
+    >>>             col("b").count().alias("h"),
+    >>>             col("b").avg().alias("i"),
+    >>>             col("b").stddev_pop().alias("j"),
+    >>>             col("b").stddev_samp().alias("k"),
+    >>>             col("b").var_pop().alias("l"),
+    >>>             col("b").var_samp().alias("m"),
+    >>>             col("b").collect().alias("n"))
+
+.. seealso:: :func:`~Expression.sum`, :func:`~Expression.sum0`, 
:func:`~Expression.min`,
+             :func:`~Expression.max`, :func:`~Expression.count`, 
:func:`~Expression.avg`,
+             :func:`~Expression.stddev_pop`, :func:`~Expression.stddev_samp`,
+             :func:`~Expression.var_pop`, :func:`~Expression.var_samp`,
+             :func:`~Expression.collect`
+"""
+
+
+_math_log_doc = """
+{op_desc}
+
+.. seealso:: :func:`~Expression.log10`, :func:`~Expression.log2`, 
:func:`~Expression.ln`,
+             :func:`~Expression.log`
+"""
+
+
+_math_trigonometric_doc = """
+Calculates the {op_desc} of a given number.
+
+.. seealso:: :func:`~Expression.sin`, :func:`~Expression.cos`, 
:func:`~Expression.sinh`,
+             :func:`~Expression.cosh`, :func:`~Expression.tan`, 
:func:`~Expression.cot`,
+             :func:`~Expression.asin`, :func:`~Expression.acos`, 
:func:`~Expression.atan`,
+             :func:`~Expression.tanh`
+"""
+
+_string_doc_seealso = """
+.. seealso:: :func:`~Expression.trim_leading`, 
:func:`~Expression.trim_trailing`,
+             :func:`~Expression.trim`, :func:`~Expression.replace`, 
:func:`~Expression.char_length`,
+             :func:`~Expression.upper_case`, :func:`~Expression.lower_case`,
+             :func:`~Expression.init_cap`, :func:`~Expression.like`, 
:func:`~Expression.similar`,
+             :func:`~Expression.position`, :func:`~Expression.lpad`, 
:func:`~Expression.rpad`,
+             :func:`~Expression.overlay`, :func:`~Expression.regexp_replace`,
+             :func:`~Expression.regexp_extract`, :func:`~Expression.substring`,
+             :func:`~Expression.from_base64`, :func:`~Expression.to_base64`,
+             :func:`~Expression.ltrim`, :func:`~Expression.rtrim`, 
:func:`~Expression.repeat`
+"""
+
+_temporal_doc_seealso = """
+.. seealso:: :func:`~Expression.to_date`, :func:`~Expression.to_time`,
+             :func:`~Expression.to_timestamp`, :func:`~Expression.extract`,
+             :func:`~Expression.floor`, :func:`~Expression.ceil`
+"""
+
+
+_time_doc = """
+Creates an interval of the given number of {op_desc}.
+
+The produced expression is of type :func:`~DataTypes.INTERVAL`.
+
+.. seealso:: :func:`~Expression.year`, :func:`~Expression.years`, 
:func:`~Expression.quarter`,
+             :func:`~Expression.quarters`, :func:`~Expression.month`, 
:func:`~Expression.months`,
+             :func:`~Expression.week`, :func:`~Expression.weeks`, 
:func:`~Expression.day`,
+             :func:`~Expression.days`, :func:`~Expression.hour`, 
:func:`~Expression.hours`,
+             :func:`~Expression.minute`, :func:`~Expression.minutes`, 
:func:`~Expression.second`,
+             :func:`~Expression.seconds`, :func:`~Expression.milli`, 
:func:`~Expression.millis`
+"""
+
+
+_hash_doc = """
+Returns the {op_desc} hash of the string argument; null if string is null.
+
+:return: string of {bit} hexadecimal digits or null.
+
+.. seealso:: :func:`~Expression.md5`, :func:`~Expression.sha1`, 
:func:`~Expression.sha224`,
+             :func:`~Expression.sha256`, :func:`~Expression.sha384`, 
:func:`~Expression.sha512`,
+             :func:`~Expression.sha2`
+"""
+
+
+def _make_math_log_doc():
+    math_log_funcs = {
+        Expression.log10: "Calculates the base 10 logarithm of the given 
value.",
+        Expression.log2: "Calculates the base 2 logarithm of the given value.",
+        Expression.ln: "Calculates the natural logarithm of the given value.",
+        Expression.log: "Calculates the natural logarithm of the given value 
if base is not "
+                        "specified. Otherwise, calculates the logarithm of the 
given value to the "
+                        "given base.",
+    }
+
+    for func, op_desc in math_log_funcs.items():
+        func.__doc__ = _math_log_doc.format(op_desc=op_desc)
+
+
+def _make_math_trigonometric_doc():
+    math_trigonometric_funcs = {
+        Expression.cosh: "hyperbolic cosine",
+        Expression.sinh: "hyperbolic sine",
+        Expression.sin: "sine",
+        Expression.cos: "cosine",
+        Expression.tan: "tangent",
+        Expression.cot: "cotangent",
+        Expression.asin: "arc sine",
+        Expression.acos: "arc cosine",
+        Expression.atan: "arc tangent",
+        Expression.tanh: "hyperbolic tangent",
+    }
+
+    for func, op_desc in math_trigonometric_funcs.items():
+        func.__doc__ = _math_trigonometric_doc.format(op_desc=op_desc)
+
+
+def _make_aggregation_doc():
+    aggregation_funcs = {
+        Expression.sum: "Returns the sum of the numeric field across all input 
values. "
+                        "If all values are null, null is returned.",
+        Expression.sum0: "Returns the sum of the numeric field across all 
input values. "
+                        "If all values are null, 0 is returned.",
+        Expression.min: "Returns the minimum value of field across all input 
values.",
+        Expression.max: "Returns the maximum value of field across all input 
values.",
+        Expression.count: "Returns the number of input rows for which the 
field is not null.",
+        Expression.avg: "Returns the average (arithmetic mean) of the numeric 
field across all "
+                        "input values.",
+        Expression.stddev_pop: "Returns the population standard deviation of 
an expression(the "
+                               "square root of varPop()).",
+        Expression.stddev_samp: "Returns the sample standard deviation of an 
expression(the square "
+                                "root of varSamp()).",
+        Expression.var_pop: "Returns the population standard variance of an 
expression.",
+        Expression.var_samp: "Returns the sample variance of a given 
expression.",
+        Expression.collect: "Returns multiset aggregate of a given 
expression.",
+    }
+
+    for func, op_desc in aggregation_funcs.items():
+        func.__doc__ = _aggregation_doc.format(op_desc=op_desc)
+
+
+def _make_string_doc():
+    string_funcs = [
+        Expression.substring, Expression.trim_leading, 
Expression.trim_trailing, Expression.trim,
+        Expression.replace, Expression.char_length, Expression.upper_case, 
Expression.lower_case,
+        Expression.init_cap, Expression.like, Expression.similar, 
Expression.position,
+        Expression.lpad, Expression.rpad, Expression.overlay, 
Expression.regexp_replace,
+        Expression.regexp_extract, Expression.from_base64, 
Expression.to_base64,
+        Expression.ltrim, Expression.rtrim, Expression.repeat
+    ]
+
+    for func in string_funcs:
+        func.__doc__ = func.__doc__.replace('  ', '') + _string_doc_seealso
+
+
+def _make_temporal_doc():
+    temporal_funcs = [
+        Expression.to_date, Expression.to_time, Expression.to_timestamp, 
Expression.extract,
+        Expression.floor, Expression.ceil
+    ]
+
+    for func in temporal_funcs:
+        func.__doc__ = func.__doc__.replace('  ', '') + _temporal_doc_seealso
+
+
+def _make_time_doc():
+    time_funcs = {
+        Expression.year: "years",
+        Expression.years: "years",
+        Expression.quarter: "quarters",
+        Expression.quarters: "quarters",
+        Expression.month: "months",
+        Expression.months: "months",
+        Expression.week: "weeks",
+        Expression.weeks: "weeks",
+        Expression.day: "days",
+        Expression.days: "days",
+        Expression.hour: "hours",
+        Expression.hours: "hours",
+        Expression.minute: "minutes",
+        Expression.minutes: "minutes",
+        Expression.second: "seconds",
+        Expression.seconds: "seconds",
+        Expression.milli: "millis",
+        Expression.millis: "millis"
+    }
+
+    for func, op_desc in time_funcs.items():
+        func.__doc__ = _time_doc.format(op_desc=op_desc)
+
+
+def _make_hash_doc():
+    hash_funcs = {
+        Expression.md5: ("MD5", 32),
+        Expression.sha1: ("SHA-1", 40),
+        Expression.sha224: ("SHA-224", 56),
+        Expression.sha256: ("SHA-256", 64),
+        Expression.sha384: ("SHA-384", 96),
+        Expression.sha512: ("SHA-512", 128)
+    }
+
+    for func, (op_desc, bit) in hash_funcs.items():
+        func.__doc__ = _hash_doc.format(op_desc=op_desc, bit=bit)
+
+
+def _add_version_doc():
+    for func_name in dir(Expression):
+        if not func_name.startswith("_"):
+            add_version_doc(getattr(Expression, func_name), "1.12.0")
+
+
+def _get_java_expression(expr):
+    return expr._j_expr if isinstance(expr, Expression) else expr
+
+
+def _get_or_create_java_expression(expr: Union["Expression", str]):
+    if isinstance(expr, Expression):
+        return expr._j_expr
+    elif isinstance(expr, str):
+        from pyflink.table.expressions import col
+        return col(expr)._j_expr
+    else:
+        raise TypeError(
+            "Invalid argument: expected Expression or string, got 
{0}.".format(type(expr)))
+
+
+def _unary_op(op_name: str):
+    def _(self):
+        return Expression(getattr(self._j_expr, op_name)())
+
+    return _
+
+
+def _binary_op(op_name: str, reverse: bool = False):
+    def _(self, other):
+        if reverse:
+            return Expression(getattr(_get_java_expression(other), 
op_name)(self._j_expr))
+        else:
+            return Expression(getattr(self._j_expr, 
op_name)(_get_java_expression(other)))
+
+    return _
+
+
+def _ternary_op(op_name: str):
+    def _(self, first, second):
+        return Expression(getattr(self._j_expr, op_name)(
+            _get_java_expression(first), _get_java_expression(second)))
+
+    return _
+
+
+def _expressions_op(op_name: str):
+    def _(self, *args):
+        from pyflink.table import expressions
+        return getattr(expressions, op_name)(self, *[_get_java_expression(arg) 
for arg in args])
+
+    return _
+
+
+class TimeIntervalUnit(object):
+    """
+    Units for working with time intervals.
+
+    .. versionadded:: 1.12.0
+    """
+
+    YEAR = 0,
+    YEAR_TO_MONTH = 1,
+    QUARTER = 2,
+    MONTH = 3,
+    WEEK = 4,
+    DAY = 5,
+    DAY_TO_HOUR = 6,
+    DAY_TO_MINUTE = 7,
+    DAY_TO_SECOND = 8,
+    HOUR = 9,
+    SECOND = 10,
+    HOUR_TO_MINUTE = 11,
+    HOUR_TO_SECOND = 12,
+    MINUTE = 13,
+    MINUTE_TO_SECOND = 14
+
+    @staticmethod
+    def _from_j_time_interval_unit(j_time_interval_unit):
+        gateway = get_gateway()
+        JTimeIntervalUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimeIntervalUnit
+        if j_time_interval_unit == JTimeIntervalUnit.YEAR:
+            return TimeIntervalUnit.YEAR
+        elif j_time_interval_unit == JTimeIntervalUnit.YEAR_TO_MONTH:
+            return TimeIntervalUnit.YEAR_TO_MONTH
+        elif j_time_interval_unit == JTimeIntervalUnit.QUARTER:
+            return TimeIntervalUnit.QUARTER
+        elif j_time_interval_unit == JTimeIntervalUnit.MONTH:
+            return TimeIntervalUnit.MONTH
+        elif j_time_interval_unit == JTimeIntervalUnit.WEEK:
+            return TimeIntervalUnit.WEEK
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY:
+            return TimeIntervalUnit.DAY
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY_TO_HOUR:
+            return TimeIntervalUnit.DAY_TO_HOUR
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY_TO_MINUTE:
+            return TimeIntervalUnit.DAY_TO_MINUTE
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY_TO_SECOND:
+            return TimeIntervalUnit.DAY_TO_SECOND
+        elif j_time_interval_unit == JTimeIntervalUnit.HOUR:
+            return TimeIntervalUnit.HOUR
+        elif j_time_interval_unit == JTimeIntervalUnit.SECOND:
+            return TimeIntervalUnit.SECOND
+        elif j_time_interval_unit == JTimeIntervalUnit.HOUR_TO_MINUTE:
+            return TimeIntervalUnit.HOUR_TO_MINUTE
+        elif j_time_interval_unit == JTimeIntervalUnit.HOUR_TO_SECOND:
+            return TimeIntervalUnit.HOUR_TO_SECOND
+        elif j_time_interval_unit == JTimeIntervalUnit.MINUTE:
+            return TimeIntervalUnit.MINUTE
+        elif j_time_interval_unit == JTimeIntervalUnit.MINUTE_TO_SECOND:
+            return TimeIntervalUnit.MINUTE_TO_SECOND
+        else:
+            raise Exception("Unsupported Java time interval unit: %s." % 
j_time_interval_unit)
+
+    @staticmethod
+    def _to_j_time_interval_unit(time_interval_unit):
+        gateway = get_gateway()
+        JTimeIntervalUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimeIntervalUnit
+        if time_interval_unit == TimeIntervalUnit.YEAR:
+            j_time_interval_unit = JTimeIntervalUnit.YEAR
+        elif time_interval_unit == TimeIntervalUnit.YEAR_TO_MONTH:
+            j_time_interval_unit = JTimeIntervalUnit.YEAR_TO_MONTH
+        elif time_interval_unit == TimeIntervalUnit.QUARTER:
+            j_time_interval_unit = JTimeIntervalUnit.QUARTER
+        elif time_interval_unit == TimeIntervalUnit.MONTH:
+            j_time_interval_unit = JTimeIntervalUnit.MONTH
+        elif time_interval_unit == TimeIntervalUnit.WEEK:
+            j_time_interval_unit = JTimeIntervalUnit.WEEK
+        elif time_interval_unit == TimeIntervalUnit.DAY:
+            j_time_interval_unit = JTimeIntervalUnit.DAY
+        elif time_interval_unit == TimeIntervalUnit.DAY_TO_HOUR:
+            j_time_interval_unit = JTimeIntervalUnit.DAY_TO_HOUR
+        elif time_interval_unit == TimeIntervalUnit.DAY_TO_MINUTE:
+            j_time_interval_unit = JTimeIntervalUnit.DAY_TO_MINUTE
+        elif time_interval_unit == TimeIntervalUnit.DAY_TO_SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.DAY_TO_SECOND
+        elif time_interval_unit == TimeIntervalUnit.HOUR:
+            j_time_interval_unit = JTimeIntervalUnit.HOUR
+        elif time_interval_unit == TimeIntervalUnit.SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.SECOND
+        elif time_interval_unit == TimeIntervalUnit.HOUR_TO_MINUTE:
+            j_time_interval_unit = JTimeIntervalUnit.HOUR_TO_MINUTE
+        elif time_interval_unit == TimeIntervalUnit.HOUR_TO_SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.HOUR_TO_SECOND
+        elif time_interval_unit == TimeIntervalUnit.MINUTE:
+            j_time_interval_unit = JTimeIntervalUnit.MINUTE
+        elif time_interval_unit == TimeIntervalUnit.MINUTE_TO_SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.MINUTE_TO_SECOND
+        else:
+            raise TypeError("Unsupported time interval unit: %s, supported 
time interval unit "
+                            "are: YEAR, YEAR_TO_MONTH, QUARTER, MONTH, WEEK, 
DAY, DAY_TO_HOUR, "
+                            "DAY_TO_MINUTE, DAY_TO_SECOND, HOUR, SECOND, 
HOUR_TO_MINUTE, "
+                            "HOUR_TO_SECOND, MINUTE, MINUTE_TO_SECOND" % 
time_interval_unit)
+        return j_time_interval_unit
+
+
+class TimePointUnit(object):
+    """
+    Units for working with points in time.
+
+    .. versionadded:: 1.12.0
+    """
+
+    YEAR = 0,
+    MONTH = 1,
+    DAY = 2,
+    HOUR = 3,
+    MINUTE = 4,
+    SECOND = 5,
+    QUARTER = 6,
+    WEEK = 7,
+    MILLISECOND = 8,
+    MICROSECOND = 9
+
+    @staticmethod
+    def _from_j_time_point_unit(j_time_point_unit):
+        gateway = get_gateway()
+        JTimePointUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimePointUnit
+        if j_time_point_unit == JTimePointUnit.YEAR:
+            return TimePointUnit.YEAR
+        elif j_time_point_unit == JTimePointUnit.MONTH:
+            return TimePointUnit.MONTH
+        elif j_time_point_unit == JTimePointUnit.DAY:
+            return TimePointUnit.DAY
+        elif j_time_point_unit == JTimePointUnit.HOUR:
+            return TimePointUnit.HOUR
+        elif j_time_point_unit == JTimePointUnit.MINUTE:
+            return TimePointUnit.MINUTE
+        elif j_time_point_unit == JTimePointUnit.SECOND:
+            return TimePointUnit.SECOND
+        elif j_time_point_unit == JTimePointUnit.QUARTER:
+            return TimePointUnit.QUARTER
+        elif j_time_point_unit == JTimePointUnit.WEEK:
+            return TimePointUnit.WEEK
+        elif j_time_point_unit == JTimePointUnit.MILLISECOND:
+            return TimePointUnit.MILLISECOND
+        elif j_time_point_unit == JTimePointUnit.MICROSECOND:
+            return TimePointUnit.MICROSECOND
+        else:
+            raise Exception("Unsupported Java time point unit: %s." % 
j_time_point_unit)
+
+    @staticmethod
+    def _to_j_time_point_unit(time_point_unit):
+        gateway = get_gateway()
+        JTimePointUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimePointUnit
+        if time_point_unit == TimePointUnit.YEAR:
+            j_time_point_unit = JTimePointUnit.YEAR
+        elif time_point_unit == TimePointUnit.MONTH:
+            j_time_point_unit = JTimePointUnit.MONTH
+        elif time_point_unit == TimePointUnit.DAY:
+            j_time_point_unit = JTimePointUnit.DAY
+        elif time_point_unit == TimePointUnit.HOUR:
+            j_time_point_unit = JTimePointUnit.HOUR
+        elif time_point_unit == TimePointUnit.MINUTE:
+            j_time_point_unit = JTimePointUnit.MINUTE
+        elif time_point_unit == TimePointUnit.SECOND:
+            j_time_point_unit = JTimePointUnit.SECOND
+        elif time_point_unit == TimePointUnit.QUARTER:
+            j_time_point_unit = JTimePointUnit.QUARTER
+        elif time_point_unit == TimePointUnit.WEEK:
+            j_time_point_unit = JTimePointUnit.WEEK
+        elif time_point_unit == TimePointUnit.MILLISECOND:
+            j_time_point_unit = JTimePointUnit.MILLISECOND
+        elif time_point_unit == TimePointUnit.MICROSECOND:
+            j_time_point_unit = JTimePointUnit.MICROSECOND
+        else:
+            raise TypeError("Unsupported time point unit: %s, supported time 
point unit are: "
+                            "YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, QUARTER, "
+                            "WEEK, MILLISECOND, MICROSECOND" % time_point_unit)
+        return j_time_point_unit
+
+
+class Expression(object):

Review comment:
       It would be better if we add type hints with generics for the 
`Expression` class.

##########
File path: flink-python/pyflink/table/expression.py
##########
@@ -0,0 +1,1332 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink import add_version_doc
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import DataType, _to_java_data_type
+from pyflink.util.utils import to_jarray
+
+__all__ = ['Expression', 'TimeIntervalUnit', 'TimePointUnit']
+
+
+_aggregation_doc = """
+{op_desc}
+
+Example:
+::
+
+    >>> tab \\
+    >>>     .group_by(col("a")) \\
+    >>>     .select(col("a"),
+    >>>             col("b").sum().alias("d"),
+    >>>             col("b").sum0().alias("e"),
+    >>>             col("b").min().alias("f"),
+    >>>             col("b").max().alias("g"),
+    >>>             col("b").count().alias("h"),
+    >>>             col("b").avg().alias("i"),
+    >>>             col("b").stddev_pop().alias("j"),
+    >>>             col("b").stddev_samp().alias("k"),
+    >>>             col("b").var_pop().alias("l"),
+    >>>             col("b").var_samp().alias("m"),
+    >>>             col("b").collect().alias("n"))
+
+.. seealso:: :func:`~Expression.sum`, :func:`~Expression.sum0`, 
:func:`~Expression.min`,
+             :func:`~Expression.max`, :func:`~Expression.count`, 
:func:`~Expression.avg`,
+             :func:`~Expression.stddev_pop`, :func:`~Expression.stddev_samp`,
+             :func:`~Expression.var_pop`, :func:`~Expression.var_samp`,
+             :func:`~Expression.collect`
+"""
+
+
+_math_log_doc = """
+{op_desc}
+
+.. seealso:: :func:`~Expression.log10`, :func:`~Expression.log2`, 
:func:`~Expression.ln`,
+             :func:`~Expression.log`
+"""
+
+
+_math_trigonometric_doc = """
+Calculates the {op_desc} of a given number.
+
+.. seealso:: :func:`~Expression.sin`, :func:`~Expression.cos`, 
:func:`~Expression.sinh`,
+             :func:`~Expression.cosh`, :func:`~Expression.tan`, 
:func:`~Expression.cot`,
+             :func:`~Expression.asin`, :func:`~Expression.acos`, 
:func:`~Expression.atan`,
+             :func:`~Expression.tanh`
+"""
+
+_string_doc_seealso = """
+.. seealso:: :func:`~Expression.trim_leading`, 
:func:`~Expression.trim_trailing`,
+             :func:`~Expression.trim`, :func:`~Expression.replace`, 
:func:`~Expression.char_length`,
+             :func:`~Expression.upper_case`, :func:`~Expression.lower_case`,
+             :func:`~Expression.init_cap`, :func:`~Expression.like`, 
:func:`~Expression.similar`,
+             :func:`~Expression.position`, :func:`~Expression.lpad`, 
:func:`~Expression.rpad`,
+             :func:`~Expression.overlay`, :func:`~Expression.regexp_replace`,
+             :func:`~Expression.regexp_extract`, :func:`~Expression.substring`,
+             :func:`~Expression.from_base64`, :func:`~Expression.to_base64`,
+             :func:`~Expression.ltrim`, :func:`~Expression.rtrim`, 
:func:`~Expression.repeat`
+"""
+
+_temporal_doc_seealso = """
+.. seealso:: :func:`~Expression.to_date`, :func:`~Expression.to_time`,
+             :func:`~Expression.to_timestamp`, :func:`~Expression.extract`,
+             :func:`~Expression.floor`, :func:`~Expression.ceil`
+"""
+
+
+_time_doc = """
+Creates an interval of the given number of {op_desc}.
+
+The produced expression is of type :func:`~DataTypes.INTERVAL`.
+
+.. seealso:: :func:`~Expression.year`, :func:`~Expression.years`, 
:func:`~Expression.quarter`,
+             :func:`~Expression.quarters`, :func:`~Expression.month`, 
:func:`~Expression.months`,
+             :func:`~Expression.week`, :func:`~Expression.weeks`, 
:func:`~Expression.day`,
+             :func:`~Expression.days`, :func:`~Expression.hour`, 
:func:`~Expression.hours`,
+             :func:`~Expression.minute`, :func:`~Expression.minutes`, 
:func:`~Expression.second`,
+             :func:`~Expression.seconds`, :func:`~Expression.milli`, 
:func:`~Expression.millis`
+"""
+
+
+_hash_doc = """
+Returns the {op_desc} hash of the string argument; null if string is null.
+
+:return: string of {bit} hexadecimal digits or null.
+
+.. seealso:: :func:`~Expression.md5`, :func:`~Expression.sha1`, 
:func:`~Expression.sha224`,
+             :func:`~Expression.sha256`, :func:`~Expression.sha384`, 
:func:`~Expression.sha512`,
+             :func:`~Expression.sha2`
+"""
+
+
+def _make_math_log_doc():
+    math_log_funcs = {
+        Expression.log10: "Calculates the base 10 logarithm of the given 
value.",
+        Expression.log2: "Calculates the base 2 logarithm of the given value.",
+        Expression.ln: "Calculates the natural logarithm of the given value.",
+        Expression.log: "Calculates the natural logarithm of the given value 
if base is not "
+                        "specified. Otherwise, calculates the logarithm of the 
given value to the "
+                        "given base.",
+    }
+
+    for func, op_desc in math_log_funcs.items():
+        func.__doc__ = _math_log_doc.format(op_desc=op_desc)
+
+
+def _make_math_trigonometric_doc():
+    math_trigonometric_funcs = {
+        Expression.cosh: "hyperbolic cosine",
+        Expression.sinh: "hyperbolic sine",
+        Expression.sin: "sine",
+        Expression.cos: "cosine",
+        Expression.tan: "tangent",
+        Expression.cot: "cotangent",
+        Expression.asin: "arc sine",
+        Expression.acos: "arc cosine",
+        Expression.atan: "arc tangent",
+        Expression.tanh: "hyperbolic tangent",
+    }
+
+    for func, op_desc in math_trigonometric_funcs.items():
+        func.__doc__ = _math_trigonometric_doc.format(op_desc=op_desc)
+
+
+def _make_aggregation_doc():
+    aggregation_funcs = {
+        Expression.sum: "Returns the sum of the numeric field across all input 
values. "
+                        "If all values are null, null is returned.",
+        Expression.sum0: "Returns the sum of the numeric field across all 
input values. "
+                        "If all values are null, 0 is returned.",
+        Expression.min: "Returns the minimum value of field across all input 
values.",
+        Expression.max: "Returns the maximum value of field across all input 
values.",
+        Expression.count: "Returns the number of input rows for which the 
field is not null.",
+        Expression.avg: "Returns the average (arithmetic mean) of the numeric 
field across all "
+                        "input values.",
+        Expression.stddev_pop: "Returns the population standard deviation of 
an expression(the "
+                               "square root of varPop()).",
+        Expression.stddev_samp: "Returns the sample standard deviation of an 
expression(the square "
+                                "root of varSamp()).",
+        Expression.var_pop: "Returns the population standard variance of an 
expression.",
+        Expression.var_samp: "Returns the sample variance of a given 
expression.",
+        Expression.collect: "Returns multiset aggregate of a given 
expression.",
+    }
+
+    for func, op_desc in aggregation_funcs.items():
+        func.__doc__ = _aggregation_doc.format(op_desc=op_desc)
+
+
+def _make_string_doc():
+    string_funcs = [
+        Expression.substring, Expression.trim_leading, 
Expression.trim_trailing, Expression.trim,
+        Expression.replace, Expression.char_length, Expression.upper_case, 
Expression.lower_case,
+        Expression.init_cap, Expression.like, Expression.similar, 
Expression.position,
+        Expression.lpad, Expression.rpad, Expression.overlay, 
Expression.regexp_replace,
+        Expression.regexp_extract, Expression.from_base64, 
Expression.to_base64,
+        Expression.ltrim, Expression.rtrim, Expression.repeat
+    ]
+
+    for func in string_funcs:
+        func.__doc__ = func.__doc__.replace('  ', '') + _string_doc_seealso
+
+
+def _make_temporal_doc():
+    temporal_funcs = [
+        Expression.to_date, Expression.to_time, Expression.to_timestamp, 
Expression.extract,
+        Expression.floor, Expression.ceil
+    ]
+
+    for func in temporal_funcs:
+        func.__doc__ = func.__doc__.replace('  ', '') + _temporal_doc_seealso
+
+
+def _make_time_doc():
+    time_funcs = {
+        Expression.year: "years",
+        Expression.years: "years",
+        Expression.quarter: "quarters",
+        Expression.quarters: "quarters",
+        Expression.month: "months",
+        Expression.months: "months",
+        Expression.week: "weeks",
+        Expression.weeks: "weeks",
+        Expression.day: "days",
+        Expression.days: "days",
+        Expression.hour: "hours",
+        Expression.hours: "hours",
+        Expression.minute: "minutes",
+        Expression.minutes: "minutes",
+        Expression.second: "seconds",
+        Expression.seconds: "seconds",
+        Expression.milli: "millis",
+        Expression.millis: "millis"
+    }
+
+    for func, op_desc in time_funcs.items():
+        func.__doc__ = _time_doc.format(op_desc=op_desc)
+
+
+def _make_hash_doc():
+    hash_funcs = {
+        Expression.md5: ("MD5", 32),
+        Expression.sha1: ("SHA-1", 40),
+        Expression.sha224: ("SHA-224", 56),
+        Expression.sha256: ("SHA-256", 64),
+        Expression.sha384: ("SHA-384", 96),
+        Expression.sha512: ("SHA-512", 128)
+    }
+
+    for func, (op_desc, bit) in hash_funcs.items():
+        func.__doc__ = _hash_doc.format(op_desc=op_desc, bit=bit)
+
+
+def _add_version_doc():
+    for func_name in dir(Expression):
+        if not func_name.startswith("_"):
+            add_version_doc(getattr(Expression, func_name), "1.12.0")
+
+
+def _get_java_expression(expr):
+    return expr._j_expr if isinstance(expr, Expression) else expr
+
+
+def _get_or_create_java_expression(expr: Union["Expression", str]):
+    if isinstance(expr, Expression):
+        return expr._j_expr
+    elif isinstance(expr, str):
+        from pyflink.table.expressions import col
+        return col(expr)._j_expr
+    else:
+        raise TypeError(
+            "Invalid argument: expected Expression or string, got 
{0}.".format(type(expr)))
+
+
+def _unary_op(op_name: str):
+    def _(self):
+        return Expression(getattr(self._j_expr, op_name)())
+
+    return _
+
+
+def _binary_op(op_name: str, reverse: bool = False):
+    def _(self, other):
+        if reverse:
+            return Expression(getattr(_get_java_expression(other), 
op_name)(self._j_expr))
+        else:
+            return Expression(getattr(self._j_expr, 
op_name)(_get_java_expression(other)))
+
+    return _
+
+
+def _ternary_op(op_name: str):
+    def _(self, first, second):
+        return Expression(getattr(self._j_expr, op_name)(
+            _get_java_expression(first), _get_java_expression(second)))
+
+    return _
+
+
+def _expressions_op(op_name: str):
+    def _(self, *args):
+        from pyflink.table import expressions
+        return getattr(expressions, op_name)(self, *[_get_java_expression(arg) 
for arg in args])
+
+    return _
+
+
+class TimeIntervalUnit(object):
+    """
+    Units for working with time intervals.
+
+    .. versionadded:: 1.12.0
+    """
+
+    YEAR = 0,
+    YEAR_TO_MONTH = 1,
+    QUARTER = 2,
+    MONTH = 3,
+    WEEK = 4,
+    DAY = 5,
+    DAY_TO_HOUR = 6,
+    DAY_TO_MINUTE = 7,
+    DAY_TO_SECOND = 8,
+    HOUR = 9,
+    SECOND = 10,
+    HOUR_TO_MINUTE = 11,
+    HOUR_TO_SECOND = 12,
+    MINUTE = 13,
+    MINUTE_TO_SECOND = 14
+
+    @staticmethod
+    def _from_j_time_interval_unit(j_time_interval_unit):
+        gateway = get_gateway()
+        JTimeIntervalUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimeIntervalUnit
+        if j_time_interval_unit == JTimeIntervalUnit.YEAR:
+            return TimeIntervalUnit.YEAR
+        elif j_time_interval_unit == JTimeIntervalUnit.YEAR_TO_MONTH:
+            return TimeIntervalUnit.YEAR_TO_MONTH
+        elif j_time_interval_unit == JTimeIntervalUnit.QUARTER:
+            return TimeIntervalUnit.QUARTER
+        elif j_time_interval_unit == JTimeIntervalUnit.MONTH:
+            return TimeIntervalUnit.MONTH
+        elif j_time_interval_unit == JTimeIntervalUnit.WEEK:
+            return TimeIntervalUnit.WEEK
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY:
+            return TimeIntervalUnit.DAY
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY_TO_HOUR:
+            return TimeIntervalUnit.DAY_TO_HOUR
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY_TO_MINUTE:
+            return TimeIntervalUnit.DAY_TO_MINUTE
+        elif j_time_interval_unit == JTimeIntervalUnit.DAY_TO_SECOND:
+            return TimeIntervalUnit.DAY_TO_SECOND
+        elif j_time_interval_unit == JTimeIntervalUnit.HOUR:
+            return TimeIntervalUnit.HOUR
+        elif j_time_interval_unit == JTimeIntervalUnit.SECOND:
+            return TimeIntervalUnit.SECOND
+        elif j_time_interval_unit == JTimeIntervalUnit.HOUR_TO_MINUTE:
+            return TimeIntervalUnit.HOUR_TO_MINUTE
+        elif j_time_interval_unit == JTimeIntervalUnit.HOUR_TO_SECOND:
+            return TimeIntervalUnit.HOUR_TO_SECOND
+        elif j_time_interval_unit == JTimeIntervalUnit.MINUTE:
+            return TimeIntervalUnit.MINUTE
+        elif j_time_interval_unit == JTimeIntervalUnit.MINUTE_TO_SECOND:
+            return TimeIntervalUnit.MINUTE_TO_SECOND
+        else:
+            raise Exception("Unsupported Java time interval unit: %s." % 
j_time_interval_unit)
+
+    @staticmethod
+    def _to_j_time_interval_unit(time_interval_unit):
+        gateway = get_gateway()
+        JTimeIntervalUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimeIntervalUnit
+        if time_interval_unit == TimeIntervalUnit.YEAR:
+            j_time_interval_unit = JTimeIntervalUnit.YEAR
+        elif time_interval_unit == TimeIntervalUnit.YEAR_TO_MONTH:
+            j_time_interval_unit = JTimeIntervalUnit.YEAR_TO_MONTH
+        elif time_interval_unit == TimeIntervalUnit.QUARTER:
+            j_time_interval_unit = JTimeIntervalUnit.QUARTER
+        elif time_interval_unit == TimeIntervalUnit.MONTH:
+            j_time_interval_unit = JTimeIntervalUnit.MONTH
+        elif time_interval_unit == TimeIntervalUnit.WEEK:
+            j_time_interval_unit = JTimeIntervalUnit.WEEK
+        elif time_interval_unit == TimeIntervalUnit.DAY:
+            j_time_interval_unit = JTimeIntervalUnit.DAY
+        elif time_interval_unit == TimeIntervalUnit.DAY_TO_HOUR:
+            j_time_interval_unit = JTimeIntervalUnit.DAY_TO_HOUR
+        elif time_interval_unit == TimeIntervalUnit.DAY_TO_MINUTE:
+            j_time_interval_unit = JTimeIntervalUnit.DAY_TO_MINUTE
+        elif time_interval_unit == TimeIntervalUnit.DAY_TO_SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.DAY_TO_SECOND
+        elif time_interval_unit == TimeIntervalUnit.HOUR:
+            j_time_interval_unit = JTimeIntervalUnit.HOUR
+        elif time_interval_unit == TimeIntervalUnit.SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.SECOND
+        elif time_interval_unit == TimeIntervalUnit.HOUR_TO_MINUTE:
+            j_time_interval_unit = JTimeIntervalUnit.HOUR_TO_MINUTE
+        elif time_interval_unit == TimeIntervalUnit.HOUR_TO_SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.HOUR_TO_SECOND
+        elif time_interval_unit == TimeIntervalUnit.MINUTE:
+            j_time_interval_unit = JTimeIntervalUnit.MINUTE
+        elif time_interval_unit == TimeIntervalUnit.MINUTE_TO_SECOND:
+            j_time_interval_unit = JTimeIntervalUnit.MINUTE_TO_SECOND
+        else:
+            raise TypeError("Unsupported time interval unit: %s, supported 
time interval unit "
+                            "are: YEAR, YEAR_TO_MONTH, QUARTER, MONTH, WEEK, 
DAY, DAY_TO_HOUR, "
+                            "DAY_TO_MINUTE, DAY_TO_SECOND, HOUR, SECOND, 
HOUR_TO_MINUTE, "
+                            "HOUR_TO_SECOND, MINUTE, MINUTE_TO_SECOND" % 
time_interval_unit)
+        return j_time_interval_unit
+
+
+class TimePointUnit(object):
+    """
+    Units for working with points in time.
+
+    .. versionadded:: 1.12.0
+    """
+
+    YEAR = 0,
+    MONTH = 1,
+    DAY = 2,
+    HOUR = 3,
+    MINUTE = 4,
+    SECOND = 5,
+    QUARTER = 6,
+    WEEK = 7,
+    MILLISECOND = 8,
+    MICROSECOND = 9
+
+    @staticmethod
+    def _from_j_time_point_unit(j_time_point_unit):
+        gateway = get_gateway()
+        JTimePointUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimePointUnit
+        if j_time_point_unit == JTimePointUnit.YEAR:
+            return TimePointUnit.YEAR
+        elif j_time_point_unit == JTimePointUnit.MONTH:
+            return TimePointUnit.MONTH
+        elif j_time_point_unit == JTimePointUnit.DAY:
+            return TimePointUnit.DAY
+        elif j_time_point_unit == JTimePointUnit.HOUR:
+            return TimePointUnit.HOUR
+        elif j_time_point_unit == JTimePointUnit.MINUTE:
+            return TimePointUnit.MINUTE
+        elif j_time_point_unit == JTimePointUnit.SECOND:
+            return TimePointUnit.SECOND
+        elif j_time_point_unit == JTimePointUnit.QUARTER:
+            return TimePointUnit.QUARTER
+        elif j_time_point_unit == JTimePointUnit.WEEK:
+            return TimePointUnit.WEEK
+        elif j_time_point_unit == JTimePointUnit.MILLISECOND:
+            return TimePointUnit.MILLISECOND
+        elif j_time_point_unit == JTimePointUnit.MICROSECOND:
+            return TimePointUnit.MICROSECOND
+        else:
+            raise Exception("Unsupported Java time point unit: %s." % 
j_time_point_unit)
+
+    @staticmethod
+    def _to_j_time_point_unit(time_point_unit):
+        gateway = get_gateway()
+        JTimePointUnit = 
gateway.jvm.org.apache.flink.table.expressions.TimePointUnit
+        if time_point_unit == TimePointUnit.YEAR:
+            j_time_point_unit = JTimePointUnit.YEAR
+        elif time_point_unit == TimePointUnit.MONTH:
+            j_time_point_unit = JTimePointUnit.MONTH
+        elif time_point_unit == TimePointUnit.DAY:
+            j_time_point_unit = JTimePointUnit.DAY
+        elif time_point_unit == TimePointUnit.HOUR:
+            j_time_point_unit = JTimePointUnit.HOUR
+        elif time_point_unit == TimePointUnit.MINUTE:
+            j_time_point_unit = JTimePointUnit.MINUTE
+        elif time_point_unit == TimePointUnit.SECOND:
+            j_time_point_unit = JTimePointUnit.SECOND
+        elif time_point_unit == TimePointUnit.QUARTER:
+            j_time_point_unit = JTimePointUnit.QUARTER
+        elif time_point_unit == TimePointUnit.WEEK:
+            j_time_point_unit = JTimePointUnit.WEEK
+        elif time_point_unit == TimePointUnit.MILLISECOND:
+            j_time_point_unit = JTimePointUnit.MILLISECOND
+        elif time_point_unit == TimePointUnit.MICROSECOND:
+            j_time_point_unit = JTimePointUnit.MICROSECOND
+        else:
+            raise TypeError("Unsupported time point unit: %s, supported time 
point unit are: "
+                            "YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, QUARTER, "
+                            "WEEK, MILLISECOND, MICROSECOND" % time_point_unit)
+        return j_time_point_unit
+
+
+class Expression(object):
+    """
+    Expressions represent a logical tree for producing a computation result.
+    Expressions might be literal values, function calls, or field references.
+
+    .. versionadded:: 1.12.0
+    """
+
+    def __init__(self, j_expr_or_property_name):
+        self._j_expr_or_property_name = j_expr_or_property_name
+
+    __abs__ = _unary_op("abs")
+
+    # comparison functions
+    __eq__ = _binary_op("isEqual")
+    __ne__ = _binary_op("isNotEqual")
+    __lt__ = _binary_op("isLess")
+    __gt__ = _binary_op("isGreater")
+    __le__ = _binary_op("isLessOrEqual")
+    __ge__ = _binary_op("isGreaterOrEqual")
+
+    # logic functions
+    __and__ = _binary_op("and")
+    __or__ = _binary_op("or")
+    __invert__ = _unary_op('isNotTrue')
+
+    __rand__ = _binary_op("and")
+    __ror__ = _binary_op("or")
+
+    # arithmetic functions
+    __add__ = _binary_op("plus")
+    __sub__ = _binary_op("minus")
+    __mul__ = _binary_op("times")
+    __truediv__ = _binary_op("dividedBy")
+    __mod__ = _binary_op("mod")
+    __pow__ = _binary_op("power")
+    __neg__ = _expressions_op("negative")
+
+    __radd__ = _binary_op("plus")
+    __rsub__ = _binary_op("minus", True)
+    __rmul__ = _binary_op("times")
+    __rtruediv__ = _binary_op("dividedBy", True)
+    __rmod__ = _binary_op("mod", True)
+    __rpow__ = _binary_op("power", True)
+
+    def __str__(self):
+        return self._j_expr.asSummaryString()
+
+    def __getattr__(self, name):
+        if name == '_j_expr':
+            if isinstance(self._j_expr_or_property_name, str):
+                gateway = get_gateway()
+                return getattr(gateway.jvm.Expressions, 
self._j_expr_or_property_name)
+            else:
+                return self._j_expr_or_property_name
+        return self.get(name)
+
+    def __getitem__(self, index):
+        return self.at(index)
+
+    # ---------------------------- arithmetic functions 
----------------------------------
+
+    def exp(self):

Review comment:
       we can convert the no-argument function call to property, just like the 
elder string expression syntax and the scala expression dsl.

##########
File path: flink-python/pyflink/table/expressions.py
##########
@@ -0,0 +1,535 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink import add_version_doc
+from pyflink.java_gateway import get_gateway
+from pyflink.table.expression import Expression, _get_java_expression, 
TimePointUnit
+from pyflink.table.types import _to_java_data_type, DataType
+from pyflink.table.udf import UserDefinedFunctionWrapper
+from pyflink.util.utils import to_jarray
+
+
+__all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 
'UNBOUNDED_ROW',
+           'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 
'current_time',
+           'current_timestamp', 'local_time', 'local_timestamp', 
'temporal_overlaps',
+           'date_format', 'timestamp_diff', 'array', 'row', 'map_', 
'row_interval', 'pi', 'e',
+           'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 
'uuid', 'null_of',
+           'log', 'with_columns', 'without_columns', 'call']
+
+
+def _leaf_op(op_name: str):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)())
+
+
+def _unary_op(op_name: str, arg):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, 
op_name)(_get_java_expression(arg)))
+
+
+def _binary_op(op_name: str, first, second):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second)))
+
+
+def _ternary_op(op_name: str, first, second, third):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third)))
+
+
+def _quaternion_op(op_name: str, first, second, third, forth):
+    gateway = get_gateway()
+    return Expression(getattr(gateway.jvm.Expressions, op_name)(
+        _get_java_expression(first),
+        _get_java_expression(second),
+        _get_java_expression(third),
+        _get_java_expression(forth)))
+
+
+def _add_version_doc():
+    from inspect import getmembers, isfunction
+    from pyflink.table import expressions
+    for o in getmembers(expressions):
+        if isfunction(o[1]) and not o[0].startswith('_'):
+            add_version_doc(o[1], "1.12.0")
+
+
+def col(name: str):
+    """
+    Creates an expression which refers to a table's field.
+
+    Example:
+    ::
+
+        >>> tab.select(col("key"), col("value"))
+
+    :param name: the field name to refer to
+    """
+    return _unary_op("$", name)
+
+
+def lit(v, data_type: DataType = None):

Review comment:
       Current implementation only support the basic type like int, str, 
bool... It would be better if we support more data type like BigDecimal, 
Timestamp and other atomic/complex datatypes. We can find all supported types 
according the Java class 
`org.apache.flink.table.types.utils.ValueDataTypeConverter`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to