This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ed394e  [FLINK-12609][python] Add 
LocalZonedTimestampType/ZonedTimestampType/DayTimeIntervalType/YearMonthIntervalType
0ed394e is described below

commit 0ed394ef0908a88502d198a90f49e04b123a4eda
Author: Dian Fu <[email protected]>
AuthorDate: Thu Jun 20 19:27:00 2019 +0800

    [FLINK-12609][python] Add 
LocalZonedTimestampType/ZonedTimestampType/DayTimeIntervalType/YearMonthIntervalType
    
    This closes #8847
---
 flink-python/pyflink/table/table_environment.py    |   4 +-
 flink-python/pyflink/table/tests/test_calc.py      |  32 +-
 flink-python/pyflink/table/tests/test_types.py     |  44 +-
 flink-python/pyflink/table/types.py                | 626 +++++++++++++++++++--
 flink-python/setup.py                              |   2 +-
 .../flink/table/util/python/PythonTableUtils.scala |   5 +
 6 files changed, 663 insertions(+), 50 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 4932861..bfb7f32 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -494,9 +494,11 @@ class TableEnvironment(object):
             raise TypeError(
                 "schema should be RowType, list, tuple or None, but got: %s" % 
schema)
 
+        # verifies the elements against the specified schema
+        elements = map(verify_obj, elements)
         # converts python data to sql data
         elements = [schema.to_sql_type(element) for element in elements]
-        return self._from_elements(map(verify_obj, elements), schema)
+        return self._from_elements(elements, schema)
 
     def _from_elements(self, elements, schema):
         """
diff --git a/flink-python/pyflink/table/tests/test_calc.py 
b/flink-python/pyflink/table/tests/test_calc.py
index edf430f..3e13450 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -63,20 +63,14 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
 
     def test_from_element(self):
         t_env = self.t_env
-        a = array.array('b')
-        a.fromstring('ABCD')
-        t = t_env.from_elements(
-            [(1, 1.0, "hi", "hello", datetime.date(1970, 1, 2), 
datetime.time(1, 0, 0),
-             datetime.datetime(1970, 1, 2, 0, 0), [1.0, None], 
array.array("d", [1.0, 2.0]),
-             ["abc"], [datetime.date(1970, 1, 2)], Decimal(1), Row("a", 
"b")(1, 2.0),
-             {"key": 1.0}, a, ExamplePoint(1.0, 2.0),
-             PythonOnlyPoint(3.0, 4.0))])
         field_names = ["a", "b", "c", "d", "e", "f", "g", "h",
-                       "i", "j", "k", "l", "m", "n", "o", "p", "q"]
+                       "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s"]
         field_types = [DataTypes.BIGINT(), DataTypes.DOUBLE(), 
DataTypes.STRING(),
                        DataTypes.STRING(), DataTypes.DATE(),
                        DataTypes.TIME(),
                        DataTypes.TIMESTAMP(),
+                       DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
+                       DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND()),
                        DataTypes.ARRAY(DataTypes.DOUBLE()),
                        DataTypes.ARRAY(DataTypes.DOUBLE(False)),
                        DataTypes.ARRAY(DataTypes.STRING()),
@@ -87,16 +81,28 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
                        DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE()),
                        DataTypes.BYTES(), ExamplePointUDT(),
                        PythonOnlyUDT()]
+        schema = DataTypes.ROW(
+            list(map(lambda field_name, field_type: 
DataTypes.FIELD(field_name, field_type),
+                 field_names,
+                 field_types)))
         table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
         t_env.register_table_sink("Results", table_sink)
-
+        t = t_env.from_elements(
+            [(1, 1.0, "hi", "hello", datetime.date(1970, 1, 2), 
datetime.time(1, 0, 0),
+              datetime.datetime(1970, 1, 2, 0, 0), datetime.datetime(1970, 1, 
2, 0, 0),
+              datetime.timedelta(days=1, microseconds=10),
+              [1.0, None], array.array("d", [1.0, 2.0]),
+              ["abc"], [datetime.date(1970, 1, 2)], Decimal(1), Row("a", 
"b")(1, 2.0),
+              {"key": 1.0}, bytearray(b'ABCD'), ExamplePoint(1.0, 2.0),
+              PythonOnlyPoint(3.0, 4.0))],
+            schema)
         t.insert_into("Results")
         t_env.exec_env().execute()
         actual = source_sink_utils.results()
 
-        expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02 
00:00:00.0,[1.0, null],'
-                    '[1.0, 2.0],[abc],[1970-01-02],1,1,2.0,{key=1.0},[65, 66, 
67, 68],[1.0, 2.0],'
-                    '[3.0, 4.0]']
+        expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02 00:00:00.0,'
+                    '1970-01-02 00:00:00.0,86400000010,[1.0, null],[1.0, 
2.0],[abc],[1970-01-02],'
+                    '1,1,2.0,{key=1.0},[65, 66, 67, 68],[1.0, 2.0],[3.0, 4.0]']
         self.assert_equals(actual, expected)
 
 
diff --git a/flink-python/pyflink/table/tests/test_types.py 
b/flink-python/pyflink/table/tests/test_types.py
index 4888583..8972fa8 100644
--- a/flink-python/pyflink/table/tests/test_types.py
+++ b/flink-python/pyflink/table/tests/test_types.py
@@ -30,7 +30,7 @@ from pyflink.table.types import (_infer_schema_from_data, 
_infer_type,
                                  _array_type_mappings, _merge_type,
                                  _create_type_verifier, UserDefinedType, 
DataTypes, Row, RowField,
                                  RowType, ArrayType, BigIntType, VarCharType, 
MapType, DataType,
-                                 _to_java_type, _from_java_type)
+                                 _to_java_type, _from_java_type, 
ZonedTimestampType)
 
 
 class ExamplePointUDT(UserDefinedType):
@@ -105,6 +105,21 @@ class PythonOnlyPoint(ExamplePoint):
     __UDT__ = PythonOnlyUDT()
 
 
+class UTCOffsetTimezone(datetime.tzinfo):
+    """
+    Specifies timezone in UTC offset
+    """
+
+    def __init__(self, offset=0):
+        self.OFFSET = datetime.timedelta(hours=offset)
+
+    def utcoffset(self, dt):
+        return self.OFFSET
+
+    def dst(self, dt):
+        return self.OFFSET
+
+
 class TypesTests(unittest.TestCase):
 
     def test_infer_schema(self):
@@ -145,7 +160,7 @@ class TypesTests(unittest.TestCase):
             'VarCharType(2147483647, true)',
             'DateType(true)',
             'TimeType(0, true)',
-            'TimestampType(6, true)',
+            'LocalZonedTimestampType(6, true)',
             'DoubleType(true)',
             "ArrayType(DoubleType(false), true)",
             "ArrayType(BigIntType(true), true)",
@@ -523,6 +538,31 @@ class TypesTests(unittest.TestCase):
         tst = DataTypes.TIMESTAMP()
         self.assertEqual(tst.to_sql_type(datetime.datetime.max) % 1000000, 
999999)
 
+    def test_local_zoned_timestamp_type(self):
+        lztst = DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+        ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000, 
tzinfo=UTCOffsetTimezone(1))
+        self.assertEqual(-3600000000, lztst.to_sql_type(ts))
+
+        if sys.version_info >= (3, 6):
+            ts2 = lztst.from_sql_type(-3600000000)
+            self.assertEqual(ts.astimezone(), ts2.astimezone())
+
+    def test_zoned_timestamp_type(self):
+        ztst = ZonedTimestampType()
+        ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000, 
tzinfo=UTCOffsetTimezone(1))
+        self.assertEqual((0, 3600), ztst.to_sql_type(ts))
+
+        ts2 = ztst.from_sql_type((0, 3600))
+        self.assertEqual(ts, ts2)
+
+    def test_day_time_inteval_type(self):
+        ymt = DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND())
+        td = datetime.timedelta(days=1, seconds=10)
+        self.assertEqual(86410000000, ymt.to_sql_type(td))
+
+        td2 = ymt.from_sql_type(86410000000)
+        self.assertEqual(td, td2)
+
     def test_empty_row(self):
         row = Row()
         self.assertEqual(len(row), 0)
diff --git a/flink-python/pyflink/table/types.py 
b/flink-python/pyflink/table/types.py
index c6092ab..af48049 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -387,7 +387,7 @@ class TimeType(AtomicType):
     :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10**6
+    EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6
 
     def __init__(self, precision=0, nullable=True):
         super(TimeType, self).__init__(nullable)
@@ -401,20 +401,21 @@ class TimeType(AtomicType):
         return True
 
     def to_sql_type(self, t):
-        if t.tzinfo is not None:
-            offset = t.utcoffset()
-            offset = offset if offset else datetime.timedelta()
-            offset_microseconds =\
-                (offset.days * 86400 + offset.seconds) * 10 ** 6 + 
offset.microseconds
-        else:
-            offset_microseconds = self.EPOCH_ORDINAL
-        minutes = t.hour * 60 + t.minute
-        seconds = minutes * 60 + t.second
-        return seconds * 10**6 + t.microsecond - offset_microseconds
+        if t is not None:
+            if t.tzinfo is not None:
+                offset = t.utcoffset()
+                offset = offset if offset else datetime.timedelta()
+                offset_microseconds =\
+                    (offset.days * 86400 + offset.seconds) * 10 ** 6 + 
offset.microseconds
+            else:
+                offset_microseconds = self.EPOCH_ORDINAL
+            minutes = t.hour * 60 + t.minute
+            seconds = minutes * 60 + t.second
+            return seconds * 10 ** 6 + t.microsecond - offset_microseconds
 
     def from_sql_type(self, t):
         if t is not None:
-            seconds, microseconds = divmod(t, 10**6)
+            seconds, microseconds = divmod(t + self.EPOCH_ORDINAL, 10 ** 6)
             minutes, seconds = divmod(seconds, 60)
             hours, minutes = divmod(minutes, 60)
             return datetime.time(hours, minutes, seconds, microseconds)
@@ -422,7 +423,17 @@ class TimeType(AtomicType):
 
 class TimestampType(AtomicType):
     """
-    Timestamp data type.  SQL TIMESTAMP
+    Timestamp data type. SQL TIMESTAMP WITHOUT TIME ZONE.
+
+    Consisting of ``year-month-day hour:minute:second[.fractional]`` with up 
to nanosecond
+    precision and values ranging from ``0000-01-01 00:00:00.000000000`` to
+    ``9999-12-31 23:59:59.999999999``. Compared to the SQL standard, leap 
seconds (23:59:60 and
+    23:59:61) are not supported.
+
+    This class does not store or represent a time-zone. Instead, it is a 
description of
+    the date, as used for birthdays, combined with the local time as seen on a 
wall clock.
+    It cannot represent an instant on the time-line without additional 
information
+    such as an offset or time-zone.
 
     The precision must be greater than or equal to 0 and less than or equal to 
9.
 
@@ -445,12 +456,421 @@ class TimestampType(AtomicType):
         if dt is not None:
             seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
                        else time.mktime(dt.timetuple()))
-            return int(seconds) * 10**6 + dt.microsecond
+            return int(seconds) * 10 ** 6 + dt.microsecond
+
+    def from_sql_type(self, ts):
+        if ts is not None:
+            return datetime.datetime.fromtimestamp(ts // 10 ** 
6).replace(microsecond=ts % 10 ** 6)
+
+
+class LocalZonedTimestampType(AtomicType):
+    """
+    Timestamp data type. SQL TIMESTAMP WITH LOCAL TIME ZONE.
+
+    Consisting of ``year-month-day hour:minute:second[.fractional] zone`` with 
up to nanosecond
+    precision and values ranging from ``0000-01-01 00:00:00.000000000 +14:59`` 
to
+    ``9999-12-31 23:59:59.999999999 -14:59``. Compared to the SQL standard, 
Leap seconds (23:59:60
+    and 23:59:61) are not supported.
+
+    The value will be stored internally as a long value which stores all date 
and time
+    fields, to a precision of nanoseconds, as well as the offset from 
UTC/Greenwich.
+
+    The precision must be greater than or equal to 0 and less than or equal to 
9.
+
+    :param precision: int, the number of digits of fractional seconds 
(default: 6)
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, precision=6, nullable=True):
+        super(LocalZonedTimestampType, self).__init__(nullable)
+        assert 0 <= precision <= 9
+        self.precision = precision
+
+    def __repr__(self):
+        return "LocalZonedTimestampType(%s, %s)" % (self.precision, 
str(self._nullable).lower())
+
+    def need_conversion(self):
+        return True
+
+    def to_sql_type(self, dt):
+        if dt is not None:
+            seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
+                       else time.mktime(dt.timetuple()))
+            return int(seconds) * 10 ** 6 + dt.microsecond
+
+    def from_sql_type(self, ts):
+        if ts is not None:
+            return datetime.datetime.fromtimestamp(ts // 10 ** 
6).replace(microsecond=ts % 10 ** 6)
+
+
+class ZonedTimestampType(AtomicType):
+    """
+    Timestamp data type with time zone. SQL TIMESTAMP WITH TIME ZONE.
+
+    Consisting of ``year-month-day hour:minute:second[.fractional] zone`` with 
up to nanosecond
+    precision and values ranging from {@code 0000-01-01 00:00:00.000000000 
+14:59} to
+    ``9999-12-31 23:59:59.999999999 -14:59``. Compared to the SQL standard, 
leap seconds (23:59:60
+    and 23:59:61) are not supported.
+
+    The value will be stored internally all date and time fields, to a 
precision of
+    nanoseconds, and a time-zone, with a zone offset used to handle ambiguous 
local date-times.
+
+    The precision must be greater than or equal to 0 and less than or equal to 
9.
+
+    :param precision: int, the number of digits of fractional seconds 
(default: 6)
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, precision=6, nullable=True):
+        super(ZonedTimestampType, self).__init__(nullable)
+        assert 0 <= precision <= 9
+        self.precision = precision
+
+    def __repr__(self):
+        return "ZonedTimestampType(%s, %s)" % (self.precision, 
str(self._nullable).lower())
+
+    def need_conversion(self):
+        return True
+
+    def to_sql_type(self, dt):
+        if dt is not None:
+            seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
+                       else time.mktime(dt.timetuple()))
+            tzinfo = dt.tzinfo if dt.tzinfo else datetime.datetime.now(
+                datetime.timezone.utc).astimezone().tzinfo
+            offset = int(tzinfo.utcoffset(dt).total_seconds())
+            return int(seconds + offset) * 10 ** 6 + dt.microsecond, offset
+
+    def from_sql_type(self, zoned_ts):
+        if zoned_ts is not None:
+            from dateutil import tz
+            ts = zoned_ts[0] - zoned_ts[1] * 10 ** 6
+            tzinfo = tz.tzoffset(None, zoned_ts[1])
+            return datetime.datetime.fromtimestamp(ts // 10 ** 6, 
tz=tzinfo).replace(
+                microsecond=ts % 10 ** 6)
+
+
+class Resolution(object):
+    """
+    Helper class for defining the resolution of an interval.
+
+    :param unit: value defined in the constants of :class:`IntervalUnit`.
+    :param precision: the number of digits of years (=year precision) or the 
number of digits of
+                      days (=day precision) or the number of digits of 
fractional seconds (
+                      =fractional precision).
+    """
+
+    class IntervalUnit(object):
+        SECOND = 0
+        MINUTE = 1
+        HOUR = 2
+        DAY = 3
+        MONTH = 4
+        YEAR = 5
+
+    def __init__(self, unit, precision=-1):
+        self._unit = unit
+        self._precision = precision
+
+    @property
+    def unit(self):
+        return self._unit
+
+    @property
+    def precision(self):
+        return self._precision
+
+    def __str__(self):
+        return '%s(%s)' % (str(self._unit), str(self._precision))
+
+
+class YearMonthIntervalType(AtomicType):
+    """
+    Year-month interval types. The type must be parameterized to one of the 
following
+    resolutions: interval of years, interval of years to months, or interval 
of months.
+
+    An interval of year-month consists of ``+years-months`` with values 
ranging from ``-9999-11``
+    to ``+9999-11``. The value representation is the same for all types of 
resolutions. For
+    example, an interval of months of 50 is always represented in an 
interval-of-years-to-months
+    format (with default year precision): ``+04-02``.
+
+    :param resolution: value defined in the constants of 
:class:`YearMonthResolution`,
+                       representing one of the following resolutions: interval 
of years,
+                       interval of years to months, or interval of months.
+    :param precision: int, the number of digits of years, must have a value
+                      between 1 and 4 (both inclusive), default (2).
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    class YearMonthResolution(object):
+        """
+        Supported resolutions of :class:`YearMonthIntervalType`.
+        """
+        YEAR = 1
+        MONTH = 2
+        YEAR_TO_MONTH = 3
+
+    DEFAULT_PRECISION = 2
+
+    def __init__(self, resolution, precision=DEFAULT_PRECISION, nullable=True):
+        assert resolution == YearMonthIntervalType.YearMonthResolution.YEAR or 
\
+            resolution == YearMonthIntervalType.YearMonthResolution.MONTH or \
+            resolution == 
YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH
+        assert resolution != YearMonthIntervalType.YearMonthResolution.MONTH 
or \
+            precision == self.DEFAULT_PRECISION
+        assert 1 <= precision <= 4
+        self._resolution = resolution
+        self._precision = precision
+        super(YearMonthIntervalType, self).__init__(nullable)
+
+    @property
+    def resolution(self):
+        return self._resolution
+
+    @property
+    def precision(self):
+        return self._precision
+
+
+class DayTimeIntervalType(AtomicType):
+    """
+    Day-time interval types. The type must be parameterized to one of the 
following resolutions
+    with up to nanosecond precision: interval of days, interval of days to 
hours, interval of
+    days to minutes, interval of days to seconds, interval of hours, interval 
of hours to minutes,
+    interval of hours to seconds, interval of minutes, interval of minutes to 
seconds,
+    or interval of seconds.
+
+    An interval of day-time consists of ``+days 
hours:months:seconds.fractional`` with values
+    ranging from ``-999999 23:59:59.999999999`` to ``+999999 
23:59:59.999999999``. The value
+    representation is the same for all types of resolutions. For example, an 
interval of seconds
+    of 70 is always represented in an interval-of-days-to-seconds format (with 
default precisions):
+    ``+00 00:01:10.000000``.
+
+    :param resolution: value defined in the constants of 
:class:`DayTimeResolution`,
+                       representing one of the following resolutions: interval 
of days, interval
+                       of days to hours, interval of days to minutes, interval 
of days to seconds,
+                       interval of hours, interval of hours to minutes, 
interval of hours to
+                       seconds, interval of minutes, interval of minutes to 
seconds, or interval
+                       of seconds.
+    :param day_precision: the number of digits of days, must have a value 
between 1 and 6 (both
+                          inclusive) (default 2).
+    :param fractional_precision: the number of digits of fractional seconds, 
must have a value
+                                 between 0 and 9 (both inclusive) (default 6).
+    """
+
+    class DayTimeResolution(object):
+        """
+        Supported resolutions of :class:`DayTimeIntervalType`.
+        """
+        DAY = 1
+        DAY_TO_HOUR = 2
+        DAY_TO_MINUTE = 3
+        DAY_TO_SECOND = 4
+        HOUR = 5
+        HOUR_TO_MINUTE = 6
+        HOUR_TO_SECOND = 7
+        MINUTE = 8
+        MINUTE_TO_SECOND = 9
+        SECOND = 10
+
+    DEFAULT_DAY_PRECISION = 2
+    DEFAULT_FRACTIONAL_PRECISION = 6
+
+    def __init__(self, resolution, day_precision=DEFAULT_DAY_PRECISION,
+                 fractional_precision=DEFAULT_FRACTIONAL_PRECISION, 
nullable=True):
+        assert resolution == DayTimeIntervalType.DayTimeResolution.DAY or \
+            resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR or 
\
+            resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE 
or \
+            resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND 
or \
+            resolution == DayTimeIntervalType.DayTimeResolution.HOUR or \
+            resolution == DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE 
or \
+            resolution == DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND 
or \
+            resolution == DayTimeIntervalType.DayTimeResolution.MINUTE or \
+            resolution == 
DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND or \
+            resolution == DayTimeIntervalType.DayTimeResolution.SECOND
+
+        assert not self._needs_default_day_precision(
+            resolution) or day_precision == self.DEFAULT_DAY_PRECISION
+        assert not self._needs_default_fractional_precision(
+            resolution) or fractional_precision == 
self.DEFAULT_FRACTIONAL_PRECISION
+        assert 1 <= day_precision <= 6
+        assert 0 <= fractional_precision <= 9
+        self._resolution = resolution
+        self._day_precision = day_precision
+        self._fractional_precision = fractional_precision
+        super(DayTimeIntervalType, self).__init__(nullable)
+
+    def need_conversion(self):
+        return True
+
+    def to_sql_type(self, timedelta):
+        if timedelta is not None:
+            return (timedelta.days * 86400 + timedelta.seconds) * 10 ** 6 + 
timedelta.microseconds
 
     def from_sql_type(self, ts):
         if ts is not None:
-            # using int to avoid precision loss in float
-            return datetime.datetime.fromtimestamp(ts // 
10**6).replace(microsecond=ts % 10**6)
+            return datetime.timedelta(microseconds=ts)
+
+    @property
+    def resolution(self):
+        return self._resolution
+
+    @property
+    def day_precision(self):
+        return self._day_precision
+
+    @property
+    def fractional_precision(self):
+        return self._fractional_precision
+
+    @staticmethod
+    def _needs_default_day_precision(resolution):
+        if resolution == DayTimeIntervalType.DayTimeResolution.HOUR or \
+                resolution == 
DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE or \
+                resolution == 
DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND or \
+                resolution == DayTimeIntervalType.DayTimeResolution.MINUTE or \
+                resolution == 
DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND or \
+                resolution == DayTimeIntervalType.DayTimeResolution.SECOND:
+            return True
+        else:
+            return False
+
+    @staticmethod
+    def _needs_default_fractional_precision(resolution):
+        if resolution == DayTimeIntervalType.DayTimeResolution.DAY or \
+                resolution == 
DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR or \
+                resolution == 
DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE or \
+                resolution == DayTimeIntervalType.DayTimeResolution.HOUR or \
+                resolution == 
DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE or \
+                resolution == DayTimeIntervalType.DayTimeResolution.MINUTE:
+            return True
+        else:
+            return False
+
+
+_resolution_mappings = {
+    (Resolution.IntervalUnit.YEAR, None):
+        lambda p1, p2: YearMonthIntervalType(
+            YearMonthIntervalType.YearMonthResolution.YEAR, p1),
+    (Resolution.IntervalUnit.MONTH, None):
+        lambda p1, p2: YearMonthIntervalType(
+            YearMonthIntervalType.YearMonthResolution.MONTH),
+    (Resolution.IntervalUnit.YEAR, Resolution.IntervalUnit.MONTH):
+        lambda p1, p2: YearMonthIntervalType(
+            YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH),
+    (Resolution.IntervalUnit.DAY, None):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.DAY,
+            p1,
+            DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION),
+    (Resolution.IntervalUnit.DAY, Resolution.IntervalUnit.HOUR):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR,
+            p1,
+            DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION),
+    (Resolution.IntervalUnit.DAY, Resolution.IntervalUnit.MINUTE):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE,
+            p1,
+            DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION),
+    (Resolution.IntervalUnit.DAY, Resolution.IntervalUnit.SECOND):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, p1, p2),
+    (Resolution.IntervalUnit.HOUR, None):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.HOUR),
+    (Resolution.IntervalUnit.HOUR, Resolution.IntervalUnit.MINUTE):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE),
+    (Resolution.IntervalUnit.HOUR, Resolution.IntervalUnit.SECOND):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND,
+            DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+            p2),
+    (Resolution.IntervalUnit.MINUTE, None):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.MINUTE),
+    (Resolution.IntervalUnit.MINUTE, Resolution.IntervalUnit.SECOND):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND,
+            DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+            p2),
+    (Resolution.IntervalUnit.SECOND, None):
+        lambda p1, p2: DayTimeIntervalType(
+            DayTimeIntervalType.DayTimeResolution.SECOND,
+            DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+            p1)
+}
+
+
+def _from_resolution(upper_resolution, lower_resolution=None):
+    """
+    Creates an interval type (YearMonthIntervalType or DayTimeIntervalType) 
from the
+    upper_resolution and lower_resolution.
+    """
+    lower_unit = None if lower_resolution is None else lower_resolution.unit
+    lower_precision = -1 if lower_resolution is None else 
lower_resolution.precision
+    interval_type_provider = _resolution_mappings[(upper_resolution.unit, 
lower_unit)]
+    if interval_type_provider is None:
+        raise ValueError(
+            "Unsupported interval definition '%s TO %s'. Please check the 
documentation for "
+            "supported combinations for year-month and day-time intervals."
+            % (upper_resolution, lower_resolution))
+
+    return interval_type_provider(upper_resolution.precision, lower_precision)
+
+
+def _from_java_interval_type(j_interval_type):
+    """
+    Creates an interval type from the specified Java interval type.
+
+    :param j_interval_type: the Java interval type.
+    :return: :class:`YearMonthIntervalType` or :class:`DayTimeIntervalType`.
+    """
+    gateway = get_gateway()
+    if _is_instance_of(j_interval_type, gateway.jvm.YearMonthIntervalType):
+        resolution = j_interval_type.getResolution()
+        precision = j_interval_type.getYearPrecision()
+
+        def _from_java_year_month_resolution(j_resolution):
+            if j_resolution == 
gateway.jvm.YearMonthIntervalType.YearMonthResolution.YEAR:
+                return YearMonthIntervalType.YearMonthResolution.YEAR
+            elif j_resolution == 
gateway.jvm.YearMonthIntervalType.YearMonthResolution.MONTH:
+                return YearMonthIntervalType.YearMonthResolution.MONTH
+            else:
+                return YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH
+
+        return 
YearMonthIntervalType(_from_java_year_month_resolution(resolution), precision)
+
+    else:
+        resolution = j_interval_type.getResolution()
+        day_precision = j_interval_type.getDayPrecision()
+        fractional_precision = j_interval_type.getFractionalPrecision()
+
+        def _from_java_day_time_resolution(j_resolution):
+            if j_resolution == 
gateway.jvm.DayTimeIntervalType.DayTimeResolution.DAY:
+                return DayTimeIntervalType.DayTimeResolution.DAY
+            elif j_resolution == 
gateway.jvm.DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR:
+                return DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR
+            elif j_resolution == 
gateway.jvm.DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE:
+                return DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE
+            elif j_resolution == 
gateway.jvm.DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND:
+                return DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND
+            elif j_resolution == 
gateway.jvm.DayTimeIntervalType.DayTimeResolution.HOUR:
+                return DayTimeIntervalType.DayTimeResolution.HOUR
+            elif j_resolution == 
gateway.jvm.DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE:
+                return DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE
+            elif j_resolution == 
gateway.jvm.DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND:
+                return DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND
+            elif j_resolution == 
gateway.jvm.DayTimeIntervalType.DayTimeResolution.MINUTE:
+                return DayTimeIntervalType.DayTimeResolution.MINUTE
+            elif j_resolution == 
gateway.jvm.DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND:
+                return DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND
+            else:
+                return DayTimeIntervalType.DayTimeResolution.SECOND
+
+        return DayTimeIntervalType(
+            _from_java_day_time_resolution(resolution), day_precision, 
fractional_precision)
 
 
 _boxed_to_primitive_array_map = \
@@ -889,7 +1309,7 @@ _type_mappings = {
     bytearray: VarBinaryType(0x7fffffff),
     decimal.Decimal: DecimalType(38, 18),
     datetime.date: DateType(),
-    datetime.datetime: TimestampType(),
+    datetime.datetime: LocalZonedTimestampType(),
     datetime.time: TimeType(),
 }
 
@@ -1219,6 +1639,7 @@ def _to_java_type(data_type):
                 DateType: Types.SQL_DATE(),
                 TimeType: Types.SQL_TIME(),
                 TimestampType: Types.SQL_TIMESTAMP(),
+                LocalZonedTimestampType: Types.SQL_TIMESTAMP(),
                 CharType: Types.STRING(),
                 VarCharType: Types.STRING(),
                 BinaryType: Types.PRIMITIVE_ARRAY(Types.BYTE()),
@@ -1234,6 +1655,14 @@ def _to_java_type(data_type):
     elif type(data_type) in _python_java_types_mapping:
         return _python_java_types_mapping[type(data_type)]
 
+    # YearMonthIntervalType
+    elif isinstance(data_type, YearMonthIntervalType):
+        return Types.INTERVAL_MONTHS()
+
+    # DayTimeIntervalType
+    elif isinstance(data_type, DayTimeIntervalType):
+        return Types.INTERVAL_MILLIS()
+
     # ArrayType
     elif isinstance(data_type, ArrayType):
         if type(data_type.element_type) in _primitive_array_element_types:
@@ -1321,6 +1750,8 @@ def _from_java_type(j_data_type):
             data_type = DataTypes.DECIMAL(logical_type.getPrecision(),
                                           logical_type.getScale(),
                                           logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.DateType):
+            data_type = DataTypes.DATE(logical_type.isNullable())
         elif _is_instance_of(logical_type, gateway.jvm.TimeType):
             data_type = DataTypes.TIME(logical_type.getPrecision(), 
logical_type.isNullable())
         elif _is_instance_of(logical_type, gateway.jvm.TimestampType):
@@ -1339,26 +1770,15 @@ def _from_java_type(j_data_type):
             data_type = DataTypes.FLOAT(logical_type.isNullable())
         elif _is_instance_of(logical_type, gateway.jvm.DoubleType):
             data_type = DataTypes.DOUBLE(logical_type.isNullable())
-        elif _is_instance_of(logical_type, gateway.jvm.DateType):
-            data_type = DataTypes.DATE(logical_type.isNullable())
-        elif _is_instance_of(logical_type, gateway.jvm.TimeType):
-            data_type = DataTypes.TIME(logical_type.isNullable())
         elif _is_instance_of(logical_type, gateway.jvm.ZonedTimestampType):
             raise \
                 TypeError("Unsupported type: %s, ZonedTimestampType is not 
supported yet."
                           % j_data_type)
         elif _is_instance_of(logical_type, 
gateway.jvm.LocalZonedTimestampType):
-            raise \
-                TypeError("Unsupported type: %s, LocalZonedTimestampType is 
not supported "
-                          "currently." % j_data_type)
-        elif _is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType):
-            raise \
-                TypeError("Unsupported type: %s, DayTimeIntervalType is not 
supported yet."
-                          % j_data_type)
-        elif _is_instance_of(logical_type, gateway.jvm.YearMonthIntervalType):
-            raise \
-                TypeError("Unsupported type: %s, YearMonthIntervalType is not 
supported "
-                          "currently." % j_data_type)
+            data_type = 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(nullable=logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType) or 
\
+                _is_instance_of(logical_type, 
gateway.jvm.YearMonthIntervalType):
+            data_type = _from_java_interval_type(logical_type)
         elif _is_instance_of(logical_type, 
gateway.jvm.LegacyTypeInformationType):
             type_info = logical_type.getTypeInformation()
             BasicArrayTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.\
@@ -1607,6 +2027,9 @@ _acceptable_types = {
     DateType: (datetime.date, datetime.datetime),
     TimeType: (datetime.time,),
     TimestampType: (datetime.datetime,),
+    DayTimeIntervalType: (datetime.timedelta,),
+    LocalZonedTimestampType: (datetime.datetime,),
+    ZonedTimestampType: (datetime.datetime,),
     ArrayType: (list, tuple, array),
     MapType: (dict,),
     RowType: (tuple, list, dict),
@@ -2033,13 +2456,39 @@ class DataTypes(object):
         Compared to the SQL standard, leap seconds (``23:59:60`` and 
``23:59:61``)
         are not supported.
 
+        This class does not store or represent a time-zone. Instead, it is a 
description of
+        the date, as used for birthdays, combined with the local time as seen 
on a wall clock.
+        It cannot represent an instant on the time-line without additional 
information
+        such as an offset or time-zone.
+
         :param precision: int, the number of digits of fractional seconds.
-                          It must have a value between 0 and 9 (both 
inclusive).
+                          It must have a value between 0 and 9 (both 
inclusive). (default: 6)
         :param nullable: boolean, whether the type can be null (None) or not.
         """
         return TimestampType(precision, nullable)
 
     @staticmethod
+    def TIMESTAMP_WITH_LOCAL_TIME_ZONE(precision=6, nullable=True):
+        """
+        Data type of a timestamp WITH LOCAL time zone.
+
+        An instance consists of year-month-day hour:minute:second[.fractional
+        with up to nanosecond precision and values ranging from
+        ``0000-01-01 00:00:00.000000000 +14:59`` to ``9999-12-31 
23:59:59.999999999 -14:59``.
+
+        Compared to the SQL standard, leap seconds (``23:59:60`` and 
``23:59:61``)
+        are not supported.
+
+        The value will be stored internally as a long value which stores all 
date and time
+        fields, to a precision of nanoseconds, as well as the offset from 
UTC/Greenwich.
+
+        :param precision: int, the number of digits of fractional seconds.
+                          It must have a value between 0 and 9 (both 
inclusive). (default: 6)
+        :param nullable: boolean, whether the type can be null (None) or not.
+        """
+        return LocalZonedTimestampType(precision, nullable)
+
+    @staticmethod
     def ARRAY(element_type, nullable=True):
         """
         Data type of an array of elements with same subtype.
@@ -2111,3 +2560,114 @@ class DataTypes(object):
         :param description: string, description of the field.
         """
         return RowField(name, data_type, description)
+
+    @staticmethod
+    def SECOND(precision=DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION):
+        """
+        Resolution in seconds and (possibly) fractional seconds.
+
+        :param precision: int, the number of digits of fractional seconds. It 
must have a value
+                          between 0 and 9 (both inclusive), (default: 6).
+        :return: the specified :class:`Resolution`.
+
+        .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+        """
+        return Resolution(Resolution.IntervalUnit.SECOND, precision)
+
+    @staticmethod
+    def MINUTE():
+        """
+        Resolution in minutes.
+
+        :return: the specified :class:`Resolution`.
+
+        .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+        """
+        return Resolution(Resolution.IntervalUnit.MINUTE)
+
+    @staticmethod
+    def HOUR():
+        """
+        Resolution in hours.
+
+        :return: :class:`Resolution`
+
+        .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+        """
+        return Resolution(Resolution.IntervalUnit.HOUR)
+
+    @staticmethod
+    def DAY(precision=DayTimeIntervalType.DEFAULT_DAY_PRECISION):
+        """
+        Resolution in days.
+
+        :param precision: int, the number of digits of days. It must have a 
value between 1 and
+                          6 (both inclusive), (default: 2).
+        :return: the specified :class:`Resolution`.
+
+        .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+        """
+        return Resolution(Resolution.IntervalUnit.DAY, precision)
+
+    @staticmethod
+    def MONTH():
+        """
+        Resolution in months.
+
+        :return: the specified :class:`Resolution`.
+
+        .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+        """
+        return Resolution(Resolution.IntervalUnit.MONTH)
+
+    @staticmethod
+    def YEAR(precision=YearMonthIntervalType.DEFAULT_PRECISION):
+        """
+        Resolution in years with 2 digits for the number of years by default.
+
+        :param precision: the number of digits of years. It must have a value 
between 1 and
+                          4 (both inclusive), (default 2).
+        :return: the specified :class:`Resolution`.
+
+        .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+        """
+        return Resolution(Resolution.IntervalUnit.YEAR, precision)
+
+    @staticmethod
+    def INTERVAL(upper_resolution, lower_resolution=None):
+        """
+        Data type of a temporal interval. There are two types of temporal 
intervals: day-time
+        intervals with up to nanosecond granularity or year-month intervals 
with up to month
+        granularity.
+
+        An interval of day-time consists of ``+days 
hours:months:seconds.fractional`` with values
+        ranging from ``-999999 23:59:59.999999999`` to ``+999999 
23:59:59.999999999``. The type
+        must be parameterized to one of the following resolutions: interval of 
days, interval of
+        days to hours, interval of days to minutes, interval of days to 
seconds, interval of hours,
+        interval of hours to minutes, interval of hours to seconds, interval 
of minutes,
+        interval of minutes to seconds, or interval of seconds. The value 
representation is the
+        same for all types of resolutions. For example, an interval of seconds 
of 70 is always
+        represented in an interval-of-days-to-seconds format (with default 
precisions):
+        ``+00 00:01:10.000000``.
+
+        An interval of year-month consists of ``+years-months`` with values 
ranging from
+        ``-9999-11`` to ``+9999-11``. The type must be parameterized to one of 
the following
+        resolutions: interval of years, interval of years to months, or 
interval of months. The
+        value representation is the same for all types of resolutions. For 
example, an interval
+        of months of 50 is always represented in an 
interval-of-years-to-months format (with
+        default year precision): ``+04-02``.
+
+        Examples: ``INTERVAL(DAY(2), SECOND(9))`` for a day-time interval or
+        ``INTERVAL(YEAR(4), MONTH())`` for a year-month interval.
+
+        :param upper_resolution: :class:`Resolution`, the upper resolution of 
the interval.
+        :param lower_resolution: :class:`Resolution`, the lower resolution of 
the interval.
+
+        .. seealso:: :func:`~pyflink.table.DataTypes.SECOND`
+        .. seealso:: :func:`~pyflink.table.DataTypes.MINUTE`
+        .. seealso:: :func:`~pyflink.table.DataTypes.HOUR`
+        .. seealso:: :func:`~pyflink.table.DataTypes.DAY`
+        .. seealso:: :func:`~pyflink.table.DataTypes.MONTH`
+        .. seealso:: :func:`~pyflink.table.DataTypes.YEAR`
+        """
+        return _from_resolution(upper_resolution, lower_resolution)
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 3580eac..906ebf9 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -55,7 +55,7 @@ setup(
     license='http://www.apache.org/licenses/LICENSE-2.0',
     author='Flink Developers',
     author_email='[email protected]',
-    install_requires=['py4j==0.10.8.1'],
+    install_requires=['py4j==0.10.8.1', 'python-dateutil'],
     tests_require=['pytest==4.4.1'],
     description='Apache Flink Python API',
     long_description=long_description,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/python/PythonTableUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/python/PythonTableUtils.scala
index 73524f0..1ac2423 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/python/PythonTableUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/python/PythonTableUtils.scala
@@ -149,6 +149,11 @@ object PythonTableUtils {
       case c: Int => new Timestamp(c.toLong / 1000)
     }
 
+    case _ if dataType == Types.INTERVAL_MILLIS() => (obj: Any) => 
nullSafeConvert(obj) {
+      case c: Long => c / 1000
+      case c: Int => c.toLong / 1000
+    }
+
     case _ if dataType == Types.STRING => (obj: Any) => nullSafeConvert(obj) {
       case _ => obj.toString
     }

Reply via email to