This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0ed394e [FLINK-12609][python] Add
LocalZonedTimestampType/ZonedTimestampType/DayTimeIntervalType/YearMonthIntervalType
0ed394e is described below
commit 0ed394ef0908a88502d198a90f49e04b123a4eda
Author: Dian Fu <[email protected]>
AuthorDate: Thu Jun 20 19:27:00 2019 +0800
[FLINK-12609][python] Add
LocalZonedTimestampType/ZonedTimestampType/DayTimeIntervalType/YearMonthIntervalType
This closes #8847
---
flink-python/pyflink/table/table_environment.py | 4 +-
flink-python/pyflink/table/tests/test_calc.py | 32 +-
flink-python/pyflink/table/tests/test_types.py | 44 +-
flink-python/pyflink/table/types.py | 626 +++++++++++++++++++--
flink-python/setup.py | 2 +-
.../flink/table/util/python/PythonTableUtils.scala | 5 +
6 files changed, 663 insertions(+), 50 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py
b/flink-python/pyflink/table/table_environment.py
index 4932861..bfb7f32 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -494,9 +494,11 @@ class TableEnvironment(object):
raise TypeError(
"schema should be RowType, list, tuple or None, but got: %s" %
schema)
+ # verifies the elements against the specified schema
+ elements = map(verify_obj, elements)
# converts python data to sql data
elements = [schema.to_sql_type(element) for element in elements]
- return self._from_elements(map(verify_obj, elements), schema)
+ return self._from_elements(elements, schema)
def _from_elements(self, elements, schema):
"""
diff --git a/flink-python/pyflink/table/tests/test_calc.py
b/flink-python/pyflink/table/tests/test_calc.py
index edf430f..3e13450 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -63,20 +63,14 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
def test_from_element(self):
t_env = self.t_env
- a = array.array('b')
- a.fromstring('ABCD')
- t = t_env.from_elements(
- [(1, 1.0, "hi", "hello", datetime.date(1970, 1, 2),
datetime.time(1, 0, 0),
- datetime.datetime(1970, 1, 2, 0, 0), [1.0, None],
array.array("d", [1.0, 2.0]),
- ["abc"], [datetime.date(1970, 1, 2)], Decimal(1), Row("a",
"b")(1, 2.0),
- {"key": 1.0}, a, ExamplePoint(1.0, 2.0),
- PythonOnlyPoint(3.0, 4.0))])
field_names = ["a", "b", "c", "d", "e", "f", "g", "h",
- "i", "j", "k", "l", "m", "n", "o", "p", "q"]
+ "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s"]
field_types = [DataTypes.BIGINT(), DataTypes.DOUBLE(),
DataTypes.STRING(),
DataTypes.STRING(), DataTypes.DATE(),
DataTypes.TIME(),
DataTypes.TIMESTAMP(),
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
+ DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND()),
DataTypes.ARRAY(DataTypes.DOUBLE()),
DataTypes.ARRAY(DataTypes.DOUBLE(False)),
DataTypes.ARRAY(DataTypes.STRING()),
@@ -87,16 +81,28 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE()),
DataTypes.BYTES(), ExamplePointUDT(),
PythonOnlyUDT()]
+ schema = DataTypes.ROW(
+ list(map(lambda field_name, field_type:
DataTypes.FIELD(field_name, field_type),
+ field_names,
+ field_types)))
table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
t_env.register_table_sink("Results", table_sink)
-
+ t = t_env.from_elements(
+ [(1, 1.0, "hi", "hello", datetime.date(1970, 1, 2),
datetime.time(1, 0, 0),
+ datetime.datetime(1970, 1, 2, 0, 0), datetime.datetime(1970, 1,
2, 0, 0),
+ datetime.timedelta(days=1, microseconds=10),
+ [1.0, None], array.array("d", [1.0, 2.0]),
+ ["abc"], [datetime.date(1970, 1, 2)], Decimal(1), Row("a",
"b")(1, 2.0),
+ {"key": 1.0}, bytearray(b'ABCD'), ExamplePoint(1.0, 2.0),
+ PythonOnlyPoint(3.0, 4.0))],
+ schema)
t.insert_into("Results")
t_env.exec_env().execute()
actual = source_sink_utils.results()
- expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02
00:00:00.0,[1.0, null],'
- '[1.0, 2.0],[abc],[1970-01-02],1,1,2.0,{key=1.0},[65, 66,
67, 68],[1.0, 2.0],'
- '[3.0, 4.0]']
+ expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02 00:00:00.0,'
+ '1970-01-02 00:00:00.0,86400000010,[1.0, null],[1.0,
2.0],[abc],[1970-01-02],'
+ '1,1,2.0,{key=1.0},[65, 66, 67, 68],[1.0, 2.0],[3.0, 4.0]']
self.assert_equals(actual, expected)
diff --git a/flink-python/pyflink/table/tests/test_types.py
b/flink-python/pyflink/table/tests/test_types.py
index 4888583..8972fa8 100644
--- a/flink-python/pyflink/table/tests/test_types.py
+++ b/flink-python/pyflink/table/tests/test_types.py
@@ -30,7 +30,7 @@ from pyflink.table.types import (_infer_schema_from_data,
_infer_type,
_array_type_mappings, _merge_type,
_create_type_verifier, UserDefinedType,
DataTypes, Row, RowField,
RowType, ArrayType, BigIntType, VarCharType,
MapType, DataType,
- _to_java_type, _from_java_type)
+ _to_java_type, _from_java_type,
ZonedTimestampType)
class ExamplePointUDT(UserDefinedType):
@@ -105,6 +105,21 @@ class PythonOnlyPoint(ExamplePoint):
__UDT__ = PythonOnlyUDT()
+class UTCOffsetTimezone(datetime.tzinfo):
+ """
+ Specifies timezone in UTC offset
+ """
+
+ def __init__(self, offset=0):
+ self.OFFSET = datetime.timedelta(hours=offset)
+
+ def utcoffset(self, dt):
+ return self.OFFSET
+
+ def dst(self, dt):
+ return self.OFFSET
+
+
class TypesTests(unittest.TestCase):
def test_infer_schema(self):
@@ -145,7 +160,7 @@ class TypesTests(unittest.TestCase):
'VarCharType(2147483647, true)',
'DateType(true)',
'TimeType(0, true)',
- 'TimestampType(6, true)',
+ 'LocalZonedTimestampType(6, true)',
'DoubleType(true)',
"ArrayType(DoubleType(false), true)",
"ArrayType(BigIntType(true), true)",
@@ -523,6 +538,31 @@ class TypesTests(unittest.TestCase):
tst = DataTypes.TIMESTAMP()
self.assertEqual(tst.to_sql_type(datetime.datetime.max) % 1000000,
999999)
+ def test_local_zoned_timestamp_type(self):
+ lztst = DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000,
tzinfo=UTCOffsetTimezone(1))
+ self.assertEqual(-3600000000, lztst.to_sql_type(ts))
+
+ if sys.version_info >= (3, 6):
+ ts2 = lztst.from_sql_type(-3600000000)
+ self.assertEqual(ts.astimezone(), ts2.astimezone())
+
+ def test_zoned_timestamp_type(self):
+ ztst = ZonedTimestampType()
+ ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000,
tzinfo=UTCOffsetTimezone(1))
+ self.assertEqual((0, 3600), ztst.to_sql_type(ts))
+
+ ts2 = ztst.from_sql_type((0, 3600))
+ self.assertEqual(ts, ts2)
+
+ def test_day_time_inteval_type(self):
+ ymt = DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND())
+ td = datetime.timedelta(days=1, seconds=10)
+ self.assertEqual(86410000000, ymt.to_sql_type(td))
+
+ td2 = ymt.from_sql_type(86410000000)
+ self.assertEqual(td, td2)
+
def test_empty_row(self):
row = Row()
self.assertEqual(len(row), 0)
diff --git a/flink-python/pyflink/table/types.py
b/flink-python/pyflink/table/types.py
index c6092ab..af48049 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -387,7 +387,7 @@ class TimeType(AtomicType):
:param nullable: boolean, whether the field can be null (None) or not.
"""
- EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10**6
+ EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6
def __init__(self, precision=0, nullable=True):
super(TimeType, self).__init__(nullable)
@@ -401,20 +401,21 @@ class TimeType(AtomicType):
return True
def to_sql_type(self, t):
- if t.tzinfo is not None:
- offset = t.utcoffset()
- offset = offset if offset else datetime.timedelta()
- offset_microseconds =\
- (offset.days * 86400 + offset.seconds) * 10 ** 6 +
offset.microseconds
- else:
- offset_microseconds = self.EPOCH_ORDINAL
- minutes = t.hour * 60 + t.minute
- seconds = minutes * 60 + t.second
- return seconds * 10**6 + t.microsecond - offset_microseconds
+ if t is not None:
+ if t.tzinfo is not None:
+ offset = t.utcoffset()
+ offset = offset if offset else datetime.timedelta()
+ offset_microseconds =\
+ (offset.days * 86400 + offset.seconds) * 10 ** 6 +
offset.microseconds
+ else:
+ offset_microseconds = self.EPOCH_ORDINAL
+ minutes = t.hour * 60 + t.minute
+ seconds = minutes * 60 + t.second
+ return seconds * 10 ** 6 + t.microsecond - offset_microseconds
def from_sql_type(self, t):
if t is not None:
- seconds, microseconds = divmod(t, 10**6)
+ seconds, microseconds = divmod(t + self.EPOCH_ORDINAL, 10 ** 6)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return datetime.time(hours, minutes, seconds, microseconds)
@@ -422,7 +423,17 @@ class TimeType(AtomicType):
class TimestampType(AtomicType):
"""
- Timestamp data type. SQL TIMESTAMP
+ Timestamp data type. SQL TIMESTAMP WITHOUT TIME ZONE.
+
+ Consisting of ``year-month-day hour:minute:second[.fractional]`` with up
to nanosecond
+ precision and values ranging from ``0000-01-01 00:00:00.000000000`` to
+ ``9999-12-31 23:59:59.999999999``. Compared to the SQL standard, leap
seconds (23:59:60 and
+ 23:59:61) are not supported.
+
+ This class does not store or represent a time-zone. Instead, it is a
description of
+ the date, as used for birthdays, combined with the local time as seen on a
wall clock.
+ It cannot represent an instant on the time-line without additional
information
+ such as an offset or time-zone.
The precision must be greater than or equal to 0 and less than or equal to
9.
@@ -445,12 +456,421 @@ class TimestampType(AtomicType):
if dt is not None:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
- return int(seconds) * 10**6 + dt.microsecond
+ return int(seconds) * 10 ** 6 + dt.microsecond
+
+ def from_sql_type(self, ts):
+ if ts is not None:
+ return datetime.datetime.fromtimestamp(ts // 10 **
6).replace(microsecond=ts % 10 ** 6)
+
+
+class LocalZonedTimestampType(AtomicType):
+ """
+ Timestamp data type. SQL TIMESTAMP WITH LOCAL TIME ZONE.
+
+ Consisting of ``year-month-day hour:minute:second[.fractional] zone`` with
up to nanosecond
+ precision and values ranging from ``0000-01-01 00:00:00.000000000 +14:59``
to
+ ``9999-12-31 23:59:59.999999999 -14:59``. Compared to the SQL standard,
Leap seconds (23:59:60
+ and 23:59:61) are not supported.
+
+ The value will be stored internally as a long value which stores all date
and time
+ fields, to a precision of nanoseconds, as well as the offset from
UTC/Greenwich.
+
+ The precision must be greater than or equal to 0 and less than or equal to
9.
+
+ :param precision: int, the number of digits of fractional seconds
(default: 6)
+ :param nullable: boolean, whether the field can be null (None) or not.
+ """
+
+ def __init__(self, precision=6, nullable=True):
+ super(LocalZonedTimestampType, self).__init__(nullable)
+ assert 0 <= precision <= 9
+ self.precision = precision
+
+ def __repr__(self):
+ return "LocalZonedTimestampType(%s, %s)" % (self.precision,
str(self._nullable).lower())
+
+ def need_conversion(self):
+ return True
+
+ def to_sql_type(self, dt):
+ if dt is not None:
+ seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
+ else time.mktime(dt.timetuple()))
+ return int(seconds) * 10 ** 6 + dt.microsecond
+
+ def from_sql_type(self, ts):
+ if ts is not None:
+ return datetime.datetime.fromtimestamp(ts // 10 **
6).replace(microsecond=ts % 10 ** 6)
+
+
+class ZonedTimestampType(AtomicType):
+ """
+ Timestamp data type with time zone. SQL TIMESTAMP WITH TIME ZONE.
+
+ Consisting of ``year-month-day hour:minute:second[.fractional] zone`` with
up to nanosecond
+ precision and values ranging from {@code 0000-01-01 00:00:00.000000000
+14:59} to
+ ``9999-12-31 23:59:59.999999999 -14:59``. Compared to the SQL standard,
leap seconds (23:59:60
+ and 23:59:61) are not supported.
+
+ The value will be stored internally all date and time fields, to a
precision of
+ nanoseconds, and a time-zone, with a zone offset used to handle ambiguous
local date-times.
+
+ The precision must be greater than or equal to 0 and less than or equal to
9.
+
+ :param precision: int, the number of digits of fractional seconds
(default: 6)
+ :param nullable: boolean, whether the field can be null (None) or not.
+ """
+
+ def __init__(self, precision=6, nullable=True):
+ super(ZonedTimestampType, self).__init__(nullable)
+ assert 0 <= precision <= 9
+ self.precision = precision
+
+ def __repr__(self):
+ return "ZonedTimestampType(%s, %s)" % (self.precision,
str(self._nullable).lower())
+
+ def need_conversion(self):
+ return True
+
+ def to_sql_type(self, dt):
+ if dt is not None:
+ seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
+ else time.mktime(dt.timetuple()))
+ tzinfo = dt.tzinfo if dt.tzinfo else datetime.datetime.now(
+ datetime.timezone.utc).astimezone().tzinfo
+ offset = int(tzinfo.utcoffset(dt).total_seconds())
+ return int(seconds + offset) * 10 ** 6 + dt.microsecond, offset
+
+ def from_sql_type(self, zoned_ts):
+ if zoned_ts is not None:
+ from dateutil import tz
+ ts = zoned_ts[0] - zoned_ts[1] * 10 ** 6
+ tzinfo = tz.tzoffset(None, zoned_ts[1])
+ return datetime.datetime.fromtimestamp(ts // 10 ** 6,
tz=tzinfo).replace(
+ microsecond=ts % 10 ** 6)
+
+
+class Resolution(object):
+ """
+ Helper class for defining the resolution of an interval.
+
+ :param unit: value defined in the constants of :class:`IntervalUnit`.
+ :param precision: the number of digits of years (=year precision) or the
number of digits of
+ days (=day precision) or the number of digits of
fractional seconds (
+ =fractional precision).
+ """
+
+ class IntervalUnit(object):
+ SECOND = 0
+ MINUTE = 1
+ HOUR = 2
+ DAY = 3
+ MONTH = 4
+ YEAR = 5
+
+ def __init__(self, unit, precision=-1):
+ self._unit = unit
+ self._precision = precision
+
+ @property
+ def unit(self):
+ return self._unit
+
+ @property
+ def precision(self):
+ return self._precision
+
+ def __str__(self):
+ return '%s(%s)' % (str(self._unit), str(self._precision))
+
+
+class YearMonthIntervalType(AtomicType):
+ """
+ Year-month interval types. The type must be parameterized to one of the
following
+ resolutions: interval of years, interval of years to months, or interval
of months.
+
+ An interval of year-month consists of ``+years-months`` with values
ranging from ``-9999-11``
+ to ``+9999-11``. The value representation is the same for all types of
resolutions. For
+ example, an interval of months of 50 is always represented in an
interval-of-years-to-months
+ format (with default year precision): ``+04-02``.
+
+ :param resolution: value defined in the constants of
:class:`YearMonthResolution`,
+ representing one of the following resolutions: interval
of years,
+ interval of years to months, or interval of months.
+ :param precision: int, the number of digits of years, must have a value
+ between 1 and 4 (both inclusive), default (2).
+ :param nullable: boolean, whether the field can be null (None) or not.
+ """
+
+ class YearMonthResolution(object):
+ """
+ Supported resolutions of :class:`YearMonthIntervalType`.
+ """
+ YEAR = 1
+ MONTH = 2
+ YEAR_TO_MONTH = 3
+
+ DEFAULT_PRECISION = 2
+
+ def __init__(self, resolution, precision=DEFAULT_PRECISION, nullable=True):
+ assert resolution == YearMonthIntervalType.YearMonthResolution.YEAR or
\
+ resolution == YearMonthIntervalType.YearMonthResolution.MONTH or \
+ resolution ==
YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH
+ assert resolution != YearMonthIntervalType.YearMonthResolution.MONTH
or \
+ precision == self.DEFAULT_PRECISION
+ assert 1 <= precision <= 4
+ self._resolution = resolution
+ self._precision = precision
+ super(YearMonthIntervalType, self).__init__(nullable)
+
+ @property
+ def resolution(self):
+ return self._resolution
+
+ @property
+ def precision(self):
+ return self._precision
+
+
+class DayTimeIntervalType(AtomicType):
+ """
+ Day-time interval types. The type must be parameterized to one of the
following resolutions
+ with up to nanosecond precision: interval of days, interval of days to
hours, interval of
+ days to minutes, interval of days to seconds, interval of hours, interval
of hours to minutes,
+ interval of hours to seconds, interval of minutes, interval of minutes to
seconds,
+ or interval of seconds.
+
+ An interval of day-time consists of ``+days
hours:months:seconds.fractional`` with values
+ ranging from ``-999999 23:59:59.999999999`` to ``+999999
23:59:59.999999999``. The value
+ representation is the same for all types of resolutions. For example, an
interval of seconds
+ of 70 is always represented in an interval-of-days-to-seconds format (with
default precisions):
+ ``+00 00:01:10.000000``.
+
+ :param resolution: value defined in the constants of
:class:`DayTimeResolution`,
+ representing one of the following resolutions: interval
of days, interval
+ of days to hours, interval of days to minutes, interval
of days to seconds,
+ interval of hours, interval of hours to minutes,
interval of hours to
+ seconds, interval of minutes, interval of minutes to
seconds, or interval
+ of seconds.
+ :param day_precision: the number of digits of days, must have a value
between 1 and 6 (both
+ inclusive) (default 2).
+ :param fractional_precision: the number of digits of fractional seconds,
must have a value
+ between 0 and 9 (both inclusive) (default 6).
+ """
+
+ class DayTimeResolution(object):
+ """
+ Supported resolutions of :class:`DayTimeIntervalType`.
+ """
+ DAY = 1
+ DAY_TO_HOUR = 2
+ DAY_TO_MINUTE = 3
+ DAY_TO_SECOND = 4
+ HOUR = 5
+ HOUR_TO_MINUTE = 6
+ HOUR_TO_SECOND = 7
+ MINUTE = 8
+ MINUTE_TO_SECOND = 9
+ SECOND = 10
+
+ DEFAULT_DAY_PRECISION = 2
+ DEFAULT_FRACTIONAL_PRECISION = 6
+
+ def __init__(self, resolution, day_precision=DEFAULT_DAY_PRECISION,
+ fractional_precision=DEFAULT_FRACTIONAL_PRECISION,
nullable=True):
+ assert resolution == DayTimeIntervalType.DayTimeResolution.DAY or \
+ resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR or
\
+ resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE
or \
+ resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND
or \
+ resolution == DayTimeIntervalType.DayTimeResolution.HOUR or \
+ resolution == DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE
or \
+ resolution == DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND
or \
+ resolution == DayTimeIntervalType.DayTimeResolution.MINUTE or \
+ resolution ==
DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND or \
+ resolution == DayTimeIntervalType.DayTimeResolution.SECOND
+
+ assert not self._needs_default_day_precision(
+ resolution) or day_precision == self.DEFAULT_DAY_PRECISION
+ assert not self._needs_default_fractional_precision(
+ resolution) or fractional_precision ==
self.DEFAULT_FRACTIONAL_PRECISION
+ assert 1 <= day_precision <= 6
+ assert 0 <= fractional_precision <= 9
+ self._resolution = resolution
+ self._day_precision = day_precision
+ self._fractional_precision = fractional_precision
+ super(DayTimeIntervalType, self).__init__(nullable)
+
+ def need_conversion(self):
+ return True
+
+ def to_sql_type(self, timedelta):
+ if timedelta is not None:
+ return (timedelta.days * 86400 + timedelta.seconds) * 10 ** 6 +
timedelta.microseconds
def from_sql_type(self, ts):
if ts is not None:
- # using int to avoid precision loss in float
- return datetime.datetime.fromtimestamp(ts //
10**6).replace(microsecond=ts % 10**6)
+ return datetime.timedelta(microseconds=ts)
+
+ @property
+ def resolution(self):
+ return self._resolution
+
+ @property
+ def day_precision(self):
+ return self._day_precision
+
+ @property
+ def fractional_precision(self):
+ return self._fractional_precision
+
+ @staticmethod
+ def _needs_default_day_precision(resolution):
+ if resolution == DayTimeIntervalType.DayTimeResolution.HOUR or \
+ resolution ==
DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE or \
+ resolution ==
DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND or \
+ resolution == DayTimeIntervalType.DayTimeResolution.MINUTE or \
+ resolution ==
DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND or \
+ resolution == DayTimeIntervalType.DayTimeResolution.SECOND:
+ return True
+ else:
+ return False
+
+ @staticmethod
+ def _needs_default_fractional_precision(resolution):
+ if resolution == DayTimeIntervalType.DayTimeResolution.DAY or \
+ resolution ==
DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR or \
+ resolution ==
DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE or \
+ resolution == DayTimeIntervalType.DayTimeResolution.HOUR or \
+ resolution ==
DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE or \
+ resolution == DayTimeIntervalType.DayTimeResolution.MINUTE:
+ return True
+ else:
+ return False
+
+
+_resolution_mappings = {
+ (Resolution.IntervalUnit.YEAR, None):
+ lambda p1, p2: YearMonthIntervalType(
+ YearMonthIntervalType.YearMonthResolution.YEAR, p1),
+ (Resolution.IntervalUnit.MONTH, None):
+ lambda p1, p2: YearMonthIntervalType(
+ YearMonthIntervalType.YearMonthResolution.MONTH),
+ (Resolution.IntervalUnit.YEAR, Resolution.IntervalUnit.MONTH):
+ lambda p1, p2: YearMonthIntervalType(
+ YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH),
+ (Resolution.IntervalUnit.DAY, None):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.DAY,
+ p1,
+ DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION),
+ (Resolution.IntervalUnit.DAY, Resolution.IntervalUnit.HOUR):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR,
+ p1,
+ DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION),
+ (Resolution.IntervalUnit.DAY, Resolution.IntervalUnit.MINUTE):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE,
+ p1,
+ DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION),
+ (Resolution.IntervalUnit.DAY, Resolution.IntervalUnit.SECOND):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, p1, p2),
+ (Resolution.IntervalUnit.HOUR, None):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.HOUR),
+ (Resolution.IntervalUnit.HOUR, Resolution.IntervalUnit.MINUTE):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE),
+ (Resolution.IntervalUnit.HOUR, Resolution.IntervalUnit.SECOND):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND,
+ DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+ p2),
+ (Resolution.IntervalUnit.MINUTE, None):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.MINUTE),
+ (Resolution.IntervalUnit.MINUTE, Resolution.IntervalUnit.SECOND):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND,
+ DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+ p2),
+ (Resolution.IntervalUnit.SECOND, None):
+ lambda p1, p2: DayTimeIntervalType(
+ DayTimeIntervalType.DayTimeResolution.SECOND,
+ DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+ p1)
+}
+
+
+def _from_resolution(upper_resolution, lower_resolution=None):
+ """
+ Creates an interval type (YearMonthIntervalType or DayTimeIntervalType)
from the
+ upper_resolution and lower_resolution.
+ """
+ lower_unit = None if lower_resolution is None else lower_resolution.unit
+ lower_precision = -1 if lower_resolution is None else
lower_resolution.precision
+ interval_type_provider = _resolution_mappings[(upper_resolution.unit,
lower_unit)]
+ if interval_type_provider is None:
+ raise ValueError(
+ "Unsupported interval definition '%s TO %s'. Please check the
documentation for "
+ "supported combinations for year-month and day-time intervals."
+ % (upper_resolution, lower_resolution))
+
+ return interval_type_provider(upper_resolution.precision, lower_precision)
+
+
+def _from_java_interval_type(j_interval_type):
+ """
+ Creates an interval type from the specified Java interval type.
+
+ :param j_interval_type: the Java interval type.
+ :return: :class:`YearMonthIntervalType` or :class:`DayTimeIntervalType`.
+ """
+ gateway = get_gateway()
+ if _is_instance_of(j_interval_type, gateway.jvm.YearMonthIntervalType):
+ resolution = j_interval_type.getResolution()
+ precision = j_interval_type.getYearPrecision()
+
+ def _from_java_year_month_resolution(j_resolution):
+ if j_resolution ==
gateway.jvm.YearMonthIntervalType.YearMonthResolution.YEAR:
+ return YearMonthIntervalType.YearMonthResolution.YEAR
+ elif j_resolution ==
gateway.jvm.YearMonthIntervalType.YearMonthResolution.MONTH:
+ return YearMonthIntervalType.YearMonthResolution.MONTH
+ else:
+ return YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH
+
+ return
YearMonthIntervalType(_from_java_year_month_resolution(resolution), precision)
+
+ else:
+ resolution = j_interval_type.getResolution()
+ day_precision = j_interval_type.getDayPrecision()
+ fractional_precision = j_interval_type.getFractionalPrecision()
+
+ def _from_java_day_time_resolution(j_resolution):
+ if j_resolution ==
gateway.jvm.DayTimeIntervalType.DayTimeResolution.DAY:
+ return DayTimeIntervalType.DayTimeResolution.DAY
+ elif j_resolution ==
gateway.jvm.DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR:
+ return DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR
+ elif j_resolution ==
gateway.jvm.DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE:
+ return DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE
+ elif j_resolution ==
gateway.jvm.DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND:
+ return DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND
+ elif j_resolution ==
gateway.jvm.DayTimeIntervalType.DayTimeResolution.HOUR:
+ return DayTimeIntervalType.DayTimeResolution.HOUR
+ elif j_resolution ==
gateway.jvm.DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE:
+ return DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE
+ elif j_resolution ==
gateway.jvm.DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND:
+ return DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND
+ elif j_resolution ==
gateway.jvm.DayTimeIntervalType.DayTimeResolution.MINUTE:
+ return DayTimeIntervalType.DayTimeResolution.MINUTE
+ elif j_resolution ==
gateway.jvm.DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND:
+ return DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND
+ else:
+ return DayTimeIntervalType.DayTimeResolution.SECOND
+
+ return DayTimeIntervalType(
+ _from_java_day_time_resolution(resolution), day_precision,
fractional_precision)
_boxed_to_primitive_array_map = \
@@ -889,7 +1309,7 @@ _type_mappings = {
bytearray: VarBinaryType(0x7fffffff),
decimal.Decimal: DecimalType(38, 18),
datetime.date: DateType(),
- datetime.datetime: TimestampType(),
+ datetime.datetime: LocalZonedTimestampType(),
datetime.time: TimeType(),
}
@@ -1219,6 +1639,7 @@ def _to_java_type(data_type):
DateType: Types.SQL_DATE(),
TimeType: Types.SQL_TIME(),
TimestampType: Types.SQL_TIMESTAMP(),
+ LocalZonedTimestampType: Types.SQL_TIMESTAMP(),
CharType: Types.STRING(),
VarCharType: Types.STRING(),
BinaryType: Types.PRIMITIVE_ARRAY(Types.BYTE()),
@@ -1234,6 +1655,14 @@ def _to_java_type(data_type):
elif type(data_type) in _python_java_types_mapping:
return _python_java_types_mapping[type(data_type)]
+ # YearMonthIntervalType
+ elif isinstance(data_type, YearMonthIntervalType):
+ return Types.INTERVAL_MONTHS()
+
+ # DayTimeIntervalType
+ elif isinstance(data_type, DayTimeIntervalType):
+ return Types.INTERVAL_MILLIS()
+
# ArrayType
elif isinstance(data_type, ArrayType):
if type(data_type.element_type) in _primitive_array_element_types:
@@ -1321,6 +1750,8 @@ def _from_java_type(j_data_type):
data_type = DataTypes.DECIMAL(logical_type.getPrecision(),
logical_type.getScale(),
logical_type.isNullable())
+ elif _is_instance_of(logical_type, gateway.jvm.DateType):
+ data_type = DataTypes.DATE(logical_type.isNullable())
elif _is_instance_of(logical_type, gateway.jvm.TimeType):
data_type = DataTypes.TIME(logical_type.getPrecision(),
logical_type.isNullable())
elif _is_instance_of(logical_type, gateway.jvm.TimestampType):
@@ -1339,26 +1770,15 @@ def _from_java_type(j_data_type):
data_type = DataTypes.FLOAT(logical_type.isNullable())
elif _is_instance_of(logical_type, gateway.jvm.DoubleType):
data_type = DataTypes.DOUBLE(logical_type.isNullable())
- elif _is_instance_of(logical_type, gateway.jvm.DateType):
- data_type = DataTypes.DATE(logical_type.isNullable())
- elif _is_instance_of(logical_type, gateway.jvm.TimeType):
- data_type = DataTypes.TIME(logical_type.isNullable())
elif _is_instance_of(logical_type, gateway.jvm.ZonedTimestampType):
raise \
TypeError("Unsupported type: %s, ZonedTimestampType is not
supported yet."
% j_data_type)
elif _is_instance_of(logical_type,
gateway.jvm.LocalZonedTimestampType):
- raise \
- TypeError("Unsupported type: %s, LocalZonedTimestampType is
not supported "
- "currently." % j_data_type)
- elif _is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType):
- raise \
- TypeError("Unsupported type: %s, DayTimeIntervalType is not
supported yet."
- % j_data_type)
- elif _is_instance_of(logical_type, gateway.jvm.YearMonthIntervalType):
- raise \
- TypeError("Unsupported type: %s, YearMonthIntervalType is not
supported "
- "currently." % j_data_type)
+ data_type =
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(nullable=logical_type.isNullable())
+ elif _is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType) or
\
+ _is_instance_of(logical_type,
gateway.jvm.YearMonthIntervalType):
+ data_type = _from_java_interval_type(logical_type)
elif _is_instance_of(logical_type,
gateway.jvm.LegacyTypeInformationType):
type_info = logical_type.getTypeInformation()
BasicArrayTypeInfo =
gateway.jvm.org.apache.flink.api.common.typeinfo.\
@@ -1607,6 +2027,9 @@ _acceptable_types = {
DateType: (datetime.date, datetime.datetime),
TimeType: (datetime.time,),
TimestampType: (datetime.datetime,),
+ DayTimeIntervalType: (datetime.timedelta,),
+ LocalZonedTimestampType: (datetime.datetime,),
+ ZonedTimestampType: (datetime.datetime,),
ArrayType: (list, tuple, array),
MapType: (dict,),
RowType: (tuple, list, dict),
@@ -2033,13 +2456,39 @@ class DataTypes(object):
Compared to the SQL standard, leap seconds (``23:59:60`` and
``23:59:61``)
are not supported.
+ This class does not store or represent a time-zone. Instead, it is a
description of
+ the date, as used for birthdays, combined with the local time as seen
on a wall clock.
+ It cannot represent an instant on the time-line without additional
information
+ such as an offset or time-zone.
+
:param precision: int, the number of digits of fractional seconds.
- It must have a value between 0 and 9 (both
inclusive).
+ It must have a value between 0 and 9 (both
inclusive). (default: 6)
:param nullable: boolean, whether the type can be null (None) or not.
"""
return TimestampType(precision, nullable)
@staticmethod
+ def TIMESTAMP_WITH_LOCAL_TIME_ZONE(precision=6, nullable=True):
+ """
+ Data type of a timestamp WITH LOCAL time zone.
+
+ An instance consists of year-month-day hour:minute:second[.fractional
+ with up to nanosecond precision and values ranging from
+ ``0000-01-01 00:00:00.000000000 +14:59`` to ``9999-12-31
23:59:59.999999999 -14:59``.
+
+ Compared to the SQL standard, leap seconds (``23:59:60`` and
``23:59:61``)
+ are not supported.
+
+ The value will be stored internally as a long value which stores all
date and time
+ fields, to a precision of nanoseconds, as well as the offset from
UTC/Greenwich.
+
+ :param precision: int, the number of digits of fractional seconds.
+ It must have a value between 0 and 9 (both
inclusive). (default: 6)
+ :param nullable: boolean, whether the type can be null (None) or not.
+ """
+ return LocalZonedTimestampType(precision, nullable)
+
+ @staticmethod
def ARRAY(element_type, nullable=True):
"""
Data type of an array of elements with same subtype.
@@ -2111,3 +2560,114 @@ class DataTypes(object):
:param description: string, description of the field.
"""
return RowField(name, data_type, description)
+
+ @staticmethod
+ def SECOND(precision=DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION):
+ """
+ Resolution in seconds and (possibly) fractional seconds.
+
+ :param precision: int, the number of digits of fractional seconds. It
must have a value
+ between 0 and 9 (both inclusive), (default: 6).
+ :return: the specified :class:`Resolution`.
+
+ .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+ """
+ return Resolution(Resolution.IntervalUnit.SECOND, precision)
+
+ @staticmethod
+ def MINUTE():
+ """
+ Resolution in minutes.
+
+ :return: the specified :class:`Resolution`.
+
+ .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+ """
+ return Resolution(Resolution.IntervalUnit.MINUTE)
+
+ @staticmethod
+ def HOUR():
+ """
+ Resolution in hours.
+
+ :return: :class:`Resolution`
+
+ .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+ """
+ return Resolution(Resolution.IntervalUnit.HOUR)
+
+ @staticmethod
+ def DAY(precision=DayTimeIntervalType.DEFAULT_DAY_PRECISION):
+ """
+ Resolution in days.
+
+ :param precision: int, the number of digits of days. It must have a
value between 1 and
+ 6 (both inclusive), (default: 2).
+ :return: the specified :class:`Resolution`.
+
+ .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+ """
+ return Resolution(Resolution.IntervalUnit.DAY, precision)
+
+ @staticmethod
+ def MONTH():
+ """
+ Resolution in months.
+
+ :return: the specified :class:`Resolution`.
+
+ .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+ """
+ return Resolution(Resolution.IntervalUnit.MONTH)
+
+ @staticmethod
+ def YEAR(precision=YearMonthIntervalType.DEFAULT_PRECISION):
+ """
+ Resolution in years with 2 digits for the number of years by default.
+
+ :param precision: the number of digits of years. It must have a value
between 1 and
+ 4 (both inclusive), (default 2).
+ :return: the specified :class:`Resolution`.
+
+ .. seealso:: :func:`~pyflink.table.DataTypes.INTERVAL`
+ """
+ return Resolution(Resolution.IntervalUnit.YEAR, precision)
+
+ @staticmethod
+ def INTERVAL(upper_resolution, lower_resolution=None):
+ """
+ Data type of a temporal interval. There are two types of temporal
intervals: day-time
+ intervals with up to nanosecond granularity or year-month intervals
with up to month
+ granularity.
+
+ An interval of day-time consists of ``+days
hours:months:seconds.fractional`` with values
+ ranging from ``-999999 23:59:59.999999999`` to ``+999999
23:59:59.999999999``. The type
+ must be parameterized to one of the following resolutions: interval of
days, interval of
+ days to hours, interval of days to minutes, interval of days to
seconds, interval of hours,
+ interval of hours to minutes, interval of hours to seconds, interval
of minutes,
+ interval of minutes to seconds, or interval of seconds. The value
representation is the
+ same for all types of resolutions. For example, an interval of seconds
of 70 is always
+ represented in an interval-of-days-to-seconds format (with default
precisions):
+ ``+00 00:01:10.000000``.
+
+ An interval of year-month consists of ``+years-months`` with values
ranging from
+ ``-9999-11`` to ``+9999-11``. The type must be parameterized to one of
the following
+ resolutions: interval of years, interval of years to months, or
interval of months. The
+ value representation is the same for all types of resolutions. For
example, an interval
+ of months of 50 is always represented in an
interval-of-years-to-months format (with
+ default year precision): ``+04-02``.
+
+ Examples: ``INTERVAL(DAY(2), SECOND(9))`` for a day-time interval or
+ ``INTERVAL(YEAR(4), MONTH())`` for a year-month interval.
+
+ :param upper_resolution: :class:`Resolution`, the upper resolution of
the interval.
+ :param lower_resolution: :class:`Resolution`, the lower resolution of
the interval.
+
+ .. seealso:: :func:`~pyflink.table.DataTypes.SECOND`
+ .. seealso:: :func:`~pyflink.table.DataTypes.MINUTE`
+ .. seealso:: :func:`~pyflink.table.DataTypes.HOUR`
+ .. seealso:: :func:`~pyflink.table.DataTypes.DAY`
+ .. seealso:: :func:`~pyflink.table.DataTypes.MONTH`
+ .. seealso:: :func:`~pyflink.table.DataTypes.YEAR`
+ """
+ return _from_resolution(upper_resolution, lower_resolution)
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 3580eac..906ebf9 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -55,7 +55,7 @@ setup(
license='http://www.apache.org/licenses/LICENSE-2.0',
author='Flink Developers',
author_email='[email protected]',
- install_requires=['py4j==0.10.8.1'],
+ install_requires=['py4j==0.10.8.1', 'python-dateutil'],
tests_require=['pytest==4.4.1'],
description='Apache Flink Python API',
long_description=long_description,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/python/PythonTableUtils.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/python/PythonTableUtils.scala
index 73524f0..1ac2423 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/python/PythonTableUtils.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/python/PythonTableUtils.scala
@@ -149,6 +149,11 @@ object PythonTableUtils {
case c: Int => new Timestamp(c.toLong / 1000)
}
+ case _ if dataType == Types.INTERVAL_MILLIS() => (obj: Any) =>
nullSafeConvert(obj) {
+ case c: Long => c / 1000
+ case c: Int => c.toLong / 1000
+ }
+
case _ if dataType == Types.STRING => (obj: Any) => nullSafeConvert(obj) {
case _ => obj.toString
}