This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a1c567cab145e410030f352de6581ece55f1089 Author: huangxingbo <[email protected]> AuthorDate: Thu Jul 22 10:17:52 2021 +0800 [FLINK-22911][python] Rework from_data_stream/create_temporary_view/to_data_stream of TableEnvironment in Python Table API This closes #16611. --- flink-python/pyflink/common/__init__.py | 5 +- flink-python/pyflink/common/time.py | 32 +++ flink-python/pyflink/common/typeinfo.py | 65 +++++ flink-python/pyflink/common/types.py | 10 + flink-python/pyflink/datastream/data_stream.py | 2 +- .../pyflink/fn_execution/coder_impl_fast.pxd | 6 + .../pyflink/fn_execution/coder_impl_fast.pyx | 29 +++ .../pyflink/fn_execution/coder_impl_slow.py | 26 ++ flink-python/pyflink/fn_execution/coders.py | 19 +- .../pyflink/fn_execution/flink_fn_execution_pb2.py | 54 ++-- .../pyflink/fn_execution/tests/test_coders.py | 8 +- .../pyflink/proto/flink-fn-execution.proto | 1 + flink-python/pyflink/table/table_environment.py | 288 ++++++++++++++++++--- .../table/tests/test_table_environment_api.py | 211 ++++++++++++++- flink-python/pyflink/table/types.py | 12 + flink-python/pyflink/table/utils.py | 4 +- .../flink/streaming/api/utils/PythonTypeUtils.java | 25 +- 17 files changed, 726 insertions(+), 71 deletions(-) diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py index daeec0e..d60c5e3 100644 --- a/flink-python/pyflink/common/__init__.py +++ b/flink-python/pyflink/common/__init__.py @@ -35,7 +35,7 @@ from pyflink.common.job_status import JobStatus from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration from pyflink.common.typeinfo import Types, TypeInformation from pyflink.common.types import Row, RowKind -from pyflink.common.time import Duration +from pyflink.common.time import Duration, Instant from pyflink.common.watermark_strategy import WatermarkStrategy __all__ = [ @@ -57,5 +57,6 @@ __all__ = [ "Duration", "WatermarkStrategy", "Types", - "TypeInformation" + "TypeInformation", + "Instant", ] diff --git a/flink-python/pyflink/common/time.py b/flink-python/pyflink/common/time.py index 9e2324d..41e4f0f 100644 --- a/flink-python/pyflink/common/time.py +++ b/flink-python/pyflink/common/time.py @@ -17,6 +17,8 @@ ################################################################################ from pyflink.java_gateway import get_gateway +__all__ = ['Duration', 'Instant'] + class Duration(object): """ @@ -51,3 +53,33 @@ class Duration(object): def __eq__(self, other): return isinstance(other, Duration) and self._j_duration.equals(other._j_duration) + + +class Instant(object): + """ + An instantaneous point on the time-line. Similar to Java.time.Instant. + """ + + def __init__(self, seconds, nanos): + self.seconds = seconds + self.nanos = nanos + + def to_epoch_milli(self): + if self.seconds < 0 < self.nanos: + return (self.seconds + 1) * 1000 + self.nanos // 1000_1000 - 1000 + else: + return self.seconds * 1000 + self.nanos // 1000_000 + + @staticmethod + def of_epoch_milli(epoch_milli: int) -> 'Instant': + secs = epoch_milli // 1000 + mos = epoch_milli % 1000 + return Instant(secs, mos * 1000_000) + + def __eq__(self, other): + return (self.__class__ == other.__class__ and + self.seconds == other.seconds and + self.nanos == other.nanos) + + def __repr__(self): + return 'Instant<{}, {}>'.format(self.seconds, self.nanos) diff --git a/flink-python/pyflink/common/typeinfo.py b/flink-python/pyflink/common/typeinfo.py index 5671ac9..66f7d1d 100644 --- a/flink-python/pyflink/common/typeinfo.py +++ b/flink-python/pyflink/common/typeinfo.py @@ -89,6 +89,7 @@ class BasicType(Enum): CHAR = "Char" BIG_INT = "BigInteger" BIG_DEC = "BigDecimal" + INSTANT = "Instant" class BasicTypeInfo(TypeInformation): @@ -126,6 +127,8 @@ class BasicTypeInfo(TypeInformation): self._j_typeinfo = JBasicTypeInfo.BIG_INT_TYPE_INFO elif self._basic_type == BasicType.BIG_DEC: self._j_typeinfo = JBasicTypeInfo.BIG_DEC_TYPE_INFO + elif self._basic_type == BasicType.INSTANT: + self._j_typeinfo = JBasicTypeInfo.INSTANT_TYPE_INFO else: raise TypeError("Invalid BasicType %s." % self._basic_type) return self._j_typeinfo @@ -182,6 +185,28 @@ class BasicTypeInfo(TypeInformation): def BIG_DEC_TYPE_INFO(): return BasicTypeInfo(BasicType.BIG_DEC) + @staticmethod + def INSTANT_TYPE_INFO(): + return InstantTypeInfo(BasicType.INSTANT) + + +class InstantTypeInfo(BasicTypeInfo): + """ + InstantTypeInfo enables users to get Instant TypeInfo. + """ + def __init__(self, basic_type: BasicType): + super(InstantTypeInfo, self).__init__(basic_type) + + def need_conversion(self): + return True + + def to_internal_type(self, obj): + return obj.to_epoch_milli() * 1000 + + def from_internal_type(self, obj): + from pyflink.common.time import Instant + return Instant.of_epoch_milli(obj // 1000) + class SqlTimeTypeInfo(TypeInformation): """ @@ -642,6 +667,30 @@ class MapTypeInfo(TypeInformation): return 'MapTypeInfo<{}, {}>'.format(self._key_type_info, self._value_type_info) +class ExternalTypeInfo(TypeInformation): + def __init__(self, type_info: TypeInformation): + super(ExternalTypeInfo, self).__init__() + self._type_info = type_info + + def get_java_type_info(self) -> JavaObject: + if not self._j_typeinfo: + gateway = get_gateway() + TypeInfoDataTypeConverter = \ + gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter + JExternalTypeInfo = \ + gateway.jvm.org.apache.flink.table.runtime.typeutils.ExternalTypeInfo + + j_data_type = TypeInfoDataTypeConverter.toDataType(self._type_info.get_java_type_info()) + self._j_typeinfo = JExternalTypeInfo.of(j_data_type) + return self._j_typeinfo + + def __eq__(self, other): + return self.__class__ == other.__class__ and self._type_info == other._type_info + + def __repr__(self): + return 'ExternalTypeInfo<{}>'.format(self._type_info) + + class Types(object): """ This class gives access to the type information of the most common types for which Flink has @@ -726,6 +775,13 @@ class Types(object): return BasicTypeInfo.BIG_DEC_TYPE_INFO() @staticmethod + def INSTANT() -> TypeInformation: + """ + Returns type information for Instant. Supports a None value. + """ + return BasicTypeInfo.INSTANT_TYPE_INFO() + + @staticmethod def SQL_DATE() -> TypeInformation: """ Returns type information for Date. Supports a None value. @@ -863,6 +919,8 @@ def _from_java_type(j_type_info: JavaObject) -> TypeInformation: return Types.BIG_INT() elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_DEC_TYPE_INFO): return Types.BIG_DEC() + elif _is_instance_of(j_type_info, JBasicTypeInfo.INSTANT_TYPE_INFO): + return Types.INSTANT() JSqlTimeTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo if _is_instance_of(j_type_info, JSqlTimeTypeInfo.DATE): @@ -951,6 +1009,13 @@ def _from_java_type(j_type_info: JavaObject) -> TypeInformation: j_element_type_info = j_type_info.getElementTypeInfo() return ListTypeInfo(_from_java_type(j_element_type_info)) + JExternalTypeInfo = gateway.jvm.org.apache.flink.table.runtime.typeutils.ExternalTypeInfo + if _is_instance_of(j_type_info, JExternalTypeInfo): + TypeInfoDataTypeConverter = \ + gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter + return ExternalTypeInfo(_from_java_type( + TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType()))) + raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info) diff --git a/flink-python/pyflink/common/types.py b/flink-python/pyflink/common/types.py index 608a301..19187af 100644 --- a/flink-python/pyflink/common/types.py +++ b/flink-python/pyflink/common/types.py @@ -29,6 +29,16 @@ class RowKind(Enum): UPDATE_AFTER = 2 DELETE = 3 + def __str__(self): + if self.value == RowKind.INSERT.value: + return '+I' + elif self.value == RowKind.UPDATE_BEFORE.value: + return '-U' + elif self.value == RowKind.UPDATE_AFTER.value: + return '+U' + else: + return '-D' + def _create_row(fields, values, row_kind: RowKind = None): row = Row(*values) diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py index 325eb59..04bbf7e 100644 --- a/flink-python/pyflink/datastream/data_stream.py +++ b/flink-python/pyflink/datastream/data_stream.py @@ -41,7 +41,7 @@ from pyflink.datastream.utils import convert_to_python_obj from pyflink.java_gateway import get_gateway -__all__ = ['CloseableIterator'] +__all__ = ['CloseableIterator', 'DataStream'] class DataStream(object): diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd index a6de3fc..de6c373 100644 --- a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd +++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd @@ -17,6 +17,8 @@ ################################################################################ # cython: language_level=3 +from libc.stdint cimport int32_t, int64_t + from pyflink.fn_execution.stream_fast cimport LengthPrefixInputStream, LengthPrefixOutputStream, \ InputStream, OutputStream @@ -136,6 +138,10 @@ cdef class TimestampCoderImpl(FieldCoderImpl): cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl): cdef object _timezone +cdef class InstantCoderImpl(FieldCoderImpl): + cdef int64_t _null_seconds + cdef int32_t _null_nanos + cdef class CloudPickleCoderImpl(FieldCoderImpl): pass diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx index 719756f..7bf6f64 100644 --- a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx +++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx @@ -30,6 +30,7 @@ from typing import List, Union from cloudpickle import cloudpickle from pyflink.common import Row, RowKind +from pyflink.common.time import Instant from pyflink.datastream.window import CountWindow, TimeWindow ROW_KIND_BIT_SIZE = 2 @@ -646,6 +647,34 @@ cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl): cpdef decode_from_stream(self, InputStream in_stream, size_t size): return self._timezone.localize(self._decode_timestamp_data_from_stream(in_stream)) +cdef class InstantCoderImpl(FieldCoderImpl): + """ + A coder for Instant. + """ + + def __init__(self): + self._null_seconds = -9223372036854775808 + self._null_nanos = -2147483648 + + cpdef encode_to_stream(self, value, OutputStream out_stream): + if value is None: + out_stream.write_int64(self._null_seconds) + out_stream.write_int32(self._null_nanos) + else: + out_stream.write_int64(value.seconds) + out_stream.write_int32(value.nanos) + + cpdef decode_from_stream(self, InputStream in_stream, size_t size): + cdef int64_t seconds + cdef int32_t nanos + seconds = in_stream.read_int64() + nanos = in_stream.read_int32() + if seconds == self._null_seconds and nanos == self._null_nanos: + return None + else: + return Instant(seconds, nanos) + + cdef class CloudPickleCoderImpl(FieldCoderImpl): """ A coder used with cloudpickle for all kinds of python object. diff --git a/flink-python/pyflink/fn_execution/coder_impl_slow.py b/flink-python/pyflink/fn_execution/coder_impl_slow.py index 22bb995..b9789b3 100644 --- a/flink-python/pyflink/fn_execution/coder_impl_slow.py +++ b/flink-python/pyflink/fn_execution/coder_impl_slow.py @@ -25,6 +25,7 @@ import cloudpickle import pyarrow as pa from pyflink.common import Row, RowKind +from pyflink.common.time import Instant from pyflink.datastream.window import TimeWindow, CountWindow from pyflink.fn_execution.ResettableIO import ResettableIO from pyflink.fn_execution.stream_slow import InputStream, OutputStream @@ -589,6 +590,31 @@ class LocalZonedTimestampCoderImpl(TimestampCoderImpl): milliseconds, nanoseconds)) +class InstantCoderImpl(FieldCoderImpl): + """ + A coder for Instant. + """ + def __init__(self): + self._null_seconds = -9223372036854775808 + self._null_nanos = -2147483648 + + def encode_to_stream(self, value: Instant, out_stream: OutputStream): + if value is None: + out_stream.write_int64(self._null_seconds) + out_stream.write_int32(self._null_nanos) + else: + out_stream.write_int64(value.seconds) + out_stream.write_int32(value.nanos) + + def decode_from_stream(self, in_stream: InputStream, length: int = 0): + seconds = in_stream.read_int64() + nanos = in_stream.read_int32() + if seconds == self._null_seconds and nanos == self._null_nanos: + return None + else: + return Instant(seconds, nanos) + + class CloudPickleCoderImpl(FieldCoderImpl): """ A coder used with cloudpickle for all kinds of python object. diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py index 42502bb..7ba4aed 100644 --- a/flink-python/pyflink/fn_execution/coders.py +++ b/flink-python/pyflink/fn_execution/coders.py @@ -24,7 +24,8 @@ import pytz from pyflink.common.typeinfo import TypeInformation, BasicTypeInfo, BasicType, DateTypeInfo, \ TimeTypeInfo, TimestampTypeInfo, PrimitiveArrayTypeInfo, BasicArrayTypeInfo, TupleTypeInfo, \ - MapTypeInfo, ListTypeInfo, RowTypeInfo, PickledBytesTypeInfo, ObjectArrayTypeInfo + MapTypeInfo, ListTypeInfo, RowTypeInfo, PickledBytesTypeInfo, ObjectArrayTypeInfo, \ + ExternalTypeInfo from pyflink.fn_execution import flink_fn_execution_pb2 from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \ FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \ @@ -37,7 +38,7 @@ except: __all__ = ['FlattenRowCoder', 'RowCoder', 'BigIntCoder', 'TinyIntCoder', 'BooleanCoder', 'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder', 'BinaryCoder', 'CharCoder', - 'DateCoder', 'TimeCoder', 'TimestampCoder', 'LocalZonedTimestampCoder', + 'DateCoder', 'TimeCoder', 'TimestampCoder', 'LocalZonedTimestampCoder', 'InstantCoder', 'GenericArrayCoder', 'PrimitiveArrayCoder', 'MapCoder', 'DecimalCoder', 'BigDecimalCoder', 'TupleCoder', 'TimeWindowCoder', 'CountWindowCoder', 'PickleCoder', 'CloudPickleCoder', 'DataViewFilterCoder'] @@ -531,6 +532,14 @@ class LocalZonedTimestampCoder(FieldCoder): self.timezone == other.timezone) +class InstantCoder(FieldCoder): + """ + Coder for Instant. + """ + def get_impl(self) -> coder_impl.FieldCoderImpl: + return coder_impl.InstantCoderImpl() + + class CloudPickleCoder(FieldCoder): """ Coder used with cloudpickle to encode python object. @@ -665,7 +674,8 @@ _type_info_name_mappings = { type_info_name.SQL_DATE: DateCoder(), type_info_name.SQL_TIME: TimeCoder(), type_info_name.SQL_TIMESTAMP: TimestampCoder(3), - type_info_name.PICKLED_BYTES: CloudPickleCoder() + type_info_name.PICKLED_BYTES: CloudPickleCoder(), + type_info_name.INSTANT: InstantCoder() } @@ -708,6 +718,7 @@ _basic_type_info_mappings = { BasicType.STRING: CharCoder(), BasicType.CHAR: CharCoder(), BasicType.BIG_DEC: BigDecimalCoder(), + BasicType.INSTANT: InstantCoder() } @@ -746,5 +757,7 @@ def from_type_info(type_info: TypeInformation) -> FieldCoder: return RowCoder( [from_type_info(f) for f in type_info.get_field_types()], [f for f in type_info.get_field_names()]) + elif isinstance(type_info, ExternalTypeInfo): + return from_type_info(type_info._type_info) else: raise ValueError("Unsupported type_info %s." % type_info) diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py index e895969..6b1fd05 100644 --- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py +++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py @@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='flink-fn-execution.proto', package='org.apache.flink.fn_execution.v1', syntax='proto3', - serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...] + serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 \x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...] ) @@ -335,11 +335,15 @@ _TYPEINFO_TYPENAME = _descriptor.EnumDescriptor( name='OBJECT_ARRAY', index=21, number=21, options=None, type=None), + _descriptor.EnumValueDescriptor( + name='INSTANT', index=22, number=22, + options=None, + type=None), ], containing_type=None, options=None, serialized_start=5660, - serialized_end=5955, + serialized_end=5968, ) _sym_db.RegisterEnumDescriptor(_TYPEINFO_TYPENAME) @@ -380,8 +384,8 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=6727, - serialized_end=6860, + serialized_start=6740, + serialized_end=6873, ) _sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE) @@ -402,8 +406,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=7827, - serialized_end=7859, + serialized_start=7840, + serialized_end=7872, ) _sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE) @@ -1718,7 +1722,7 @@ _TYPEINFO = _descriptor.Descriptor( index=0, containing_type=None, fields=[]), ], serialized_start=4838, - serialized_end=5968, + serialized_end=5981, ) @@ -1755,8 +1759,8 @@ _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=6343, - serialized_end=6385, + serialized_start=6356, + serialized_end=6398, ) _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor( @@ -1834,8 +1838,8 @@ _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=6388, - serialized_end=6724, + serialized_start=6401, + serialized_end=6737, ) _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor( @@ -1900,8 +1904,8 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=5971, - serialized_end=6860, + serialized_start=5984, + serialized_end=6873, ) @@ -1931,8 +1935,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=7456, - serialized_end=7530, + serialized_start=7469, + serialized_end=7543, ) _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor( @@ -1961,8 +1965,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=7532, - serialized_end=7599, + serialized_start=7545, + serialized_end=7612, ) _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor( @@ -1991,8 +1995,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=7601, - serialized_end=7670, + serialized_start=7614, + serialized_end=7683, ) _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor( @@ -2021,8 +2025,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=7672, - serialized_end=7751, + serialized_start=7685, + serialized_end=7764, ) _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor( @@ -2051,8 +2055,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=7753, - serialized_end=7825, + serialized_start=7766, + serialized_end=7838, ) _CODERINFODESCRIPTOR = _descriptor.Descriptor( @@ -2127,8 +2131,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor( name='data_type', full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type', index=0, containing_type=None, fields=[]), ], - serialized_start=6863, - serialized_end=7872, + serialized_start=6876, + serialized_end=7885, ) _INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py b/flink-python/pyflink/fn_execution/tests/test_coders.py index 1dbf396..e72bfaa 100644 --- a/flink-python/pyflink/fn_execution/tests/test_coders.py +++ b/flink-python/pyflink/fn_execution/tests/test_coders.py @@ -25,7 +25,7 @@ from pyflink.fn_execution.coders import BigIntCoder, TinyIntCoder, BooleanCoder, SmallIntCoder, IntCoder, FloatCoder, DoubleCoder, BinaryCoder, CharCoder, DateCoder, \ TimeCoder, TimestampCoder, GenericArrayCoder, MapCoder, DecimalCoder, FlattenRowCoder,\ RowCoder, LocalZonedTimestampCoder, BigDecimalCoder, TupleCoder, PrimitiveArrayCoder,\ - TimeWindowCoder, CountWindowCoder + TimeWindowCoder, CountWindowCoder, InstantCoder from pyflink.datastream.window import TimeWindow, CountWindow from pyflink.testing.test_case_utils import PyFlinkTestCase @@ -110,6 +110,12 @@ class CodersTest(PyFlinkTestCase): self.check_coder(coder, timezone.localize(datetime.datetime(2019, 9, 10, 18, 30, 20, 123456))) + def test_instant_coder(self): + from pyflink.common.time import Instant + + coder = InstantCoder() + self.check_coder(coder, Instant(100, 2000), None, Instant(-9223372036854775808, 0)) + def test_array_coder(self): element_coder = BigIntCoder() coder = GenericArrayCoder(element_coder) diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto b/flink-python/pyflink/proto/flink-fn-execution.proto index 1a6d6b6..d51998b 100644 --- a/flink-python/pyflink/proto/flink-fn-execution.proto +++ b/flink-python/pyflink/proto/flink-fn-execution.proto @@ -309,6 +309,7 @@ message TypeInfo { MAP = 19; PICKLED_BYTES = 20; OBJECT_ARRAY = 21; + INSTANT = 22; } message MapTypeInfo { diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 77504b5..fcb9116 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -33,7 +33,7 @@ from pyflink.common import JobExecutionResult from pyflink.java_gateway import get_gateway from pyflink.serializers import BatchedSerializer, PickleSerializer from pyflink.table import Table, EnvironmentSettings, Expression, ExplainDetail, \ - Module, ModuleEntry, TableSink + Module, ModuleEntry, TableSink, Schema from pyflink.table.catalog import Catalog from pyflink.table.serializers import ArrowSerializer from pyflink.table.statement_set import StatementSet @@ -1171,23 +1171,133 @@ class TableEnvironment(object): else: self._j_tenv.registerFunction(name, java_function) - def create_temporary_view(self, view_path: str, table: Table): + def create_temporary_view(self, + view_path: str, + table_or_data_stream: Union[Table, DataStream], + *fields_or_schema: Union[str, Expression, Schema]): """ - Registers a :class:`~pyflink.table.Table` API object as a temporary view similar to SQL - temporary views. + 1. When table_or_data_stream is a :class:`~pyflink.table.Table`: - Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - it will be inaccessible in the current session. To make the permanent object available - again you can drop the corresponding temporary object. + Registers a :class:`~pyflink.table.Table` API object as a temporary view similar to SQL + temporary views. + + Temporary objects can shadow permanent ones. If a permanent object in a given path + exists, it will be inaccessible in the current session. To make the permanent object + available again you can drop the corresponding temporary object. + + 2. When table_or_data_stream is a :class:`~pyflink.datastream.DataStream`: + + 2.1 When fields_or_schema is a str or a sequence of :class:`~pyflink.table.Expression`: + + Creates a view from the given {@link DataStream} in a given path with specified + field names. Registered views can be referenced in SQL queries. + + 1. Reference input fields by name: All fields in the schema definition are + referenced by name (and possibly renamed using an alias (as). Moreover, we can + define proctime and rowtime attributes at arbitrary positions using arbitrary names + (except those that exist in the result schema). In this mode, fields can be + reordered and projected out. This mode can be used for any input type, including + POJOs. + + Example: + :: + + >>> stream = ... + # reorder the fields, rename the original 'f0' field to 'name' and add + # event-time attribute named 'rowtime' + + # use str + >>> table_env.create_temporary_view( + ... "cat.db.myTable", + ... stream, + ... "f1, rowtime.rowtime, f0 as 'name'") + + # or use a sequence of expression + >>> table_env.create_temporary_view( + ... "cat.db.myTable", + ... stream, + ... col("f1"), + ... col("rowtime").rowtime, + ... col("f0").alias('name')) + + 2. Reference input fields by position: In this mode, fields are simply renamed. + Event-time attributes can replace the field on their position in the input data + (if it is of correct type) or be appended at the end. Proctime attributes must be + appended at the end. This mode can only be used if the input type has a defined + field order (tuple, case class, Row) and none of the {@code fields} references a + field of the input type. + + Example: + :: + + >>> stream = ... + # rename the original fields to 'a' and 'b' and extract the internally attached + # timestamp into an event-time attribute named 'rowtime' + + # use str + >>> table_env.create_temporary_view( + ... "cat.db.myTable", stream, "a, b, rowtime.rowtime") + + # or use a sequence of expressions + >>> table_env.create_temporary_view( + ... "cat.db.myTable", + ... stream, + ... col("a"), + ... col("b"), + ... col("rowtime").rowtime) + + Temporary objects can shadow permanent ones. If a permanent object in a given path + exists, it will be inaccessible in the current session. To make the permanent object + available again you can drop the corresponding temporary object. + + 2.2 When fields_or_schema is a :class:`~pyflink.table.Schema`: + + Creates a view from the given {@link DataStream} in a given path. Registered views + can be referenced in SQL queries. + + See :func:`from_data_stream` for more information on how a + :class:`~pyflink.datastream.DataStream` is translated into a table. + + Temporary objects can shadow permanent ones. If a permanent object in a given path + exists, it will be inaccessible in the current session. To make the permanent object + available again you can drop the corresponding temporary object. + + .. note:: create_temporary_view by providing a Schema (case 2.) was added from flink + 1.14.0. :param view_path: The path under which the view will be registered. See also the :class:`~pyflink.table.TableEnvironment` class description for the format of the path. - :param table: The view to register. + :param table_or_data_stream: The Table or DataStream out of which to create the view. + :param fields_or_schema: The fields expressions(str) to map original fields of the + DataStream to the fields of the View or the customized schema for the final + table. .. versionadded:: 1.10.0 """ - self._j_tenv.createTemporaryView(view_path, table._j_table) + if isinstance(table_or_data_stream, Table): + self._j_tenv.createTemporaryView(view_path, table_or_data_stream._j_table) + elif len(fields_or_schema) == 0: + self._j_tenv.createTemporaryView(view_path, table_or_data_stream._j_data_stream) + elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], str): + self._j_tenv.createTemporaryView( + view_path, + table_or_data_stream._j_data_stream, + fields_or_schema[0]) + elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], Schema): + self._j_tenv.createTemporaryView( + view_path, + table_or_data_stream._j_data_stream, + fields_or_schema[0]._j_schema) + elif (len(fields_or_schema) > 0 and + all(isinstance(elem, Expression) for elem in fields_or_schema)): + self._j_tenv.createTemporaryView( + view_path, + table_or_data_stream._j_data_stream, + to_expression_jarray(fields_or_schema)) + else: + raise ValueError("Invalid arguments for 'fields': %r" % + ','.join([repr(item) for item in fields_or_schema])) def add_python_file(self, file_path: str): """ @@ -1725,27 +1835,115 @@ class StreamTableEnvironment(TableEnvironment): stream_execution_environment._j_stream_execution_environment) return StreamTableEnvironment(j_tenv) - def from_data_stream(self, data_stream: DataStream, *fields: Union[str, Expression]) -> Table: + def from_data_stream(self, + data_stream: DataStream, + *fields_or_schema: Union[str, Expression, Schema]) -> Table: """ - Converts the given DataStream into a Table with specified field names. + 1. When fields_or_schema is a str or a sequence of Expression: + + Converts the given DataStream into a Table with specified field names. + + There are two modes for mapping original fields to the fields of the Table: + + 1. Reference input fields by name: + + All fields in the schema definition are referenced by name (and possibly renamed + using and alias (as). Moreover, we can define proctime and rowtime attributes at + arbitrary positions using arbitrary names (except those that exist in the result + schema). In this mode, fields can be reordered and projected out. This mode can be + used for any input type. + + 2. Reference input fields by position: + + In this mode, fields are simply renamed. Event-time attributes can replace the field + on their position in the input data (if it is of correct type) or be appended at the + end. Proctime attributes must be appended at the end. This mode can only be used if + the input type has a defined field order (tuple, case class, Row) and none of the + fields references a field of the input type. + + 2. When fields_or_schema is a Schema: + + Converts the given DataStream into a Table. + + Column names and types of the Table are automatically derived from the TypeInformation + of the DataStream. If the outermost record's TypeInformation is a CompositeType, it will + be flattened in the first level. Composite nested fields will not be accessible. - There are two modes for mapping original fields to the fields of the Table: - 1. Reference input fields by name: - All fields in the schema definition are referenced by name (and possibly renamed using - and alias (as). Moreover, we can define proctime and rowtime attributes at arbitrary - positions using arbitrary names (except those that exist in the result schema). In this - mode, fields can be reordered and projected out. This mode can be used for any input - type. - 2. Reference input fields by position: - In this mode, fields are simply renamed. Event-time attributes can replace the field on - their position in the input data (if it is of correct type) or be appended at the end. - Proctime attributes must be appended at the end. This mode can only be used if the input - type has a defined field order (tuple, case class, Row) and none of the fields - references a field of the input type. + Since the DataStream API does not support changelog processing natively, this method + assumes append-only/insert-only semantics during the stream-to-table conversion. Records + of class Row must describe RowKind.INSERT changes. + + By default, the stream record's timestamp and watermarks are not propagated unless + explicitly declared. + + This method allows to declare a Schema for the resulting table. The declaration is + similar to a {@code CREATE TABLE} DDL in SQL and allows to: + + 1. enrich or overwrite automatically derived columns with a custom DataType + 2. reorder columns + 3. add computed or metadata columns next to the physical columns + 4. access a stream record's timestamp + 5. declare a watermark strategy or propagate the DataStream watermarks + + It is possible to declare a schema without physical/regular columns. In this case, those + columns will be automatically derived and implicitly put at the beginning of the schema + declaration. + + The following examples illustrate common schema declarations and their semantics: + + Example: + :: + + === EXAMPLE 1 === + + no physical columns defined, they will be derived automatically, + e.g. BigDecimal becomes DECIMAL(38, 18) + + >>> Schema.new_builder() \ + ... .column_by_expression("c1", "f1 + 42") \ + ... .column_by_expression("c2", "f1 - 1") \ + ... .build() + + equal to: CREATE TABLE (f0 STRING, f1 DECIMAL(38, 18), c1 AS f1 + 42, c2 AS f1 - 1) + + === EXAMPLE 2 === + + physical columns defined, input fields and columns will be mapped by name, + columns are reordered and their data type overwritten, + all columns must be defined to show up in the final table's schema + + >>> Schema.new_builder() \ + ... .column("f1", "DECIMAL(10, 2)") \ + ... .column_by_expression("c", "f1 - 1") \ + ... .column("f0", "STRING") \ + ... .build() + + equal to: CREATE TABLE (f1 DECIMAL(10, 2), c AS f1 - 1, f0 STRING) + + === EXAMPLE 3 === + + timestamp and watermarks can be added from the DataStream API, + physical columns will be derived automatically + + >>> Schema.new_builder() \ + ... .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") \ + ... .watermark("rowtime", "SOURCE_WATERMARK()") \ + ... .build() + + equal to: + CREATE TABLE ( + f0 STRING, + f1 DECIMAL(38, 18), + rowtime TIMESTAMP(3) METADATA, + WATERMARK FOR rowtime AS SOURCE_WATERMARK() + ) + + .. note:: create_temporary_view by providing a Schema (case 2.) was added from flink + 1.14.0. :param data_stream: The datastream to be converted. - :param fields: The fields expressions to map original fields of the DataStream to the fields - of the Table + :param fields_or_schema: The fields expressions to map original fields of the DataStream to + the fields of the Table or the customized schema for the final table. :return: The converted Table. .. versionadded:: 1.12.0 @@ -1753,17 +1951,43 @@ class StreamTableEnvironment(TableEnvironment): j_data_stream = data_stream._j_data_stream JPythonConfigUtil = get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil JPythonConfigUtil.configPythonOperator(j_data_stream.getExecutionEnvironment()) - if len(fields) == 0: + if len(fields_or_schema) == 0: return Table(j_table=self._j_tenv.fromDataStream(j_data_stream), t_env=self) - elif all(isinstance(f, Expression) for f in fields): + elif all(isinstance(f, Expression) for f in fields_or_schema): return Table(j_table=self._j_tenv.fromDataStream( - j_data_stream, to_expression_jarray(fields)), t_env=self) - elif len(fields) == 1 and isinstance(fields[0], str): + j_data_stream, to_expression_jarray(fields_or_schema)), t_env=self) + elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], str): warnings.warn( "Deprecated in 1.12. Use from_data_stream(DataStream, *Expression) instead.", DeprecationWarning) - return Table(j_table=self._j_tenv.fromDataStream(j_data_stream, fields[0]), t_env=self) - raise ValueError("Invalid arguments for 'fields': %r" % fields) + return Table(j_table=self._j_tenv.fromDataStream( + j_data_stream, fields_or_schema[0]), t_env=self) + elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], Schema): + return Table(j_table=self._j_tenv.fromDataStream( + j_data_stream, fields_or_schema[0]._j_schema), t_env=self) + raise ValueError("Invalid arguments for 'fields': %r" % fields_or_schema) + + def to_data_stream(self, table: Table) -> DataStream: + """ + Converts the given Table into a DataStream. + + Since the DataStream API does not support changelog processing natively, this method + assumes append-only/insert-only semantics during the table-to-stream conversion. The records + of class Row will always describe RowKind#INSERT changes. Updating tables are + not supported by this method and will produce an exception. + + Note that the type system of the table ecosystem is richer than the one of the DataStream + API. The table runtime will make sure to properly serialize the output records to the first + operator of the DataStream API. Afterwards, the Types semantics of the DataStream API + need to be considered. + + If the input table contains a single rowtime column, it will be propagated into a stream + record's timestamp. Watermarks will be propagated as well. + + :param table: The Table to convert. + :return: The converted DataStream. + """ + return DataStream(self._j_tenv.toDataStream(table._j_table)) def to_append_stream(self, table: Table, type_info: TypeInformation) -> DataStream: """ diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 1e85084..eaec37d 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -19,10 +19,16 @@ import datetime import decimal import sys from py4j.protocol import Py4JJavaError +from typing import Iterable -from pyflink.common import RowKind +from pyflink.common import RowKind, WatermarkStrategy +from pyflink.common.serializer import TypeSerializer from pyflink.common.typeinfo import Types +from pyflink.common.watermark_strategy import TimestampAssigner +from pyflink.datastream import MergingWindowAssigner, TimeWindow, Trigger, TriggerResult +from pyflink.datastream.functions import WindowFunction from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction +from pyflink.datastream.window import TimeWindowSerializer from pyflink.java_gateway import get_gateway from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, EnvironmentSettings, \ Module, ResultKind, ModuleEntry @@ -33,9 +39,9 @@ from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.types import RowType, Row from pyflink.table.udf import udf from pyflink.testing import source_sink_utils -from pyflink.testing.test_case_utils import \ - PyFlinkBatchTableTestCase, PyFlinkStreamTableTestCase, \ - _load_specific_flink_module_jars +from pyflink.testing.test_case_utils import ( + PyFlinkBatchTableTestCase, PyFlinkStreamTableTestCase, PyFlinkTestCase, + _load_specific_flink_module_jars, invoke_java_object_method) from pyflink.util.java_utils import get_j_env_configuration @@ -228,7 +234,46 @@ class TableEnvironmentTest(object): self.assertEqual("fake", table.get_options().get("connector")) -class DataStreamConversionTestCases(object): +class DataStreamConversionTestCases(PyFlinkTestCase): + + def setUp(self) -> None: + from pyflink.datastream import StreamExecutionEnvironment + + super(DataStreamConversionTestCases, self).setUp() + self.env = StreamExecutionEnvironment.get_execution_environment() + self.t_env = StreamTableEnvironment.create(self.env) + + self.env.set_parallelism(2) + config = invoke_java_object_method( + self.env._j_stream_execution_environment, "getConfiguration") + config.setString("akka.ask.timeout", "20 s") + self.t_env.get_config().get_configuration().set_string( + "python.fn-execution.bundle.size", "1") + self.test_sink = DataStreamTestSinkFunction() + + def test_from_data_stream_atomic(self): + data_stream = self.env.from_collection([(1,), (2,), (3,), (4,), (5,)]) + result = self.t_env.from_data_stream(data_stream).execute() + self.assertEqual("""( + `f0` RAW('[B', '...') +)""", + result._j_table_result.getResolvedSchema().toString()) + with result.collect() as result: + collected_result = [str(item) for item in result] + expected_result = [item for item in map(str, [Row(1), Row(2), Row(3), Row(4), Row(5)])] + expected_result.sort() + collected_result.sort() + self.assertEqual(expected_result, collected_result) + + def test_to_data_stream_atomic(self): + table = self.t_env.from_elements([(1,), (2,), (3,)], ["a"]) + ds = self.t_env.to_data_stream(table) + ds.add_sink(self.test_sink) + self.env.execute() + results = self.test_sink.get_results(False) + results.sort() + expected = ['+I[1]', '+I[2]', '+I[3]'] + self.assertEqual(expected, results) def test_from_data_stream(self): self.env.set_parallelism(1) @@ -258,6 +303,84 @@ class DataStreamConversionTestCases(object): result = source_sink_utils.results() self.assert_equals(result, expected) + def test_from_data_stream_with_schema(self): + from pyflink.table import Schema + + ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')], + type_info=Types.ROW_NAMED( + ["a", "b", "c"], + [Types.INT(), Types.STRING(), Types.STRING()])) + + table = self.t_env.from_data_stream(ds, + Schema.new_builder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.STRING()) + .build()) + result = table.execute() + with result.collect() as result: + collected_result = [str(item) for item in result] + expected_result = [item for item in + map(str, [Row(1, 'Hi', 'Hello'), Row(2, 'Hello', 'Hi')])] + expected_result.sort() + collected_result.sort() + self.assertEqual(expected_result, collected_result) + + def test_from_and_to_data_stream_event_time(self): + from pyflink.table import Schema + + ds = self.env.from_collection([(1, 42, "a"), (2, 5, "a"), (3, 1000, "c"), (100, 1000, "c")], + Types.ROW_NAMED( + ["a", "b", "c"], + [Types.LONG(), Types.INT(), Types.STRING()])) + ds = ds.assign_timestamps_and_watermarks( + WatermarkStrategy.for_monotonous_timestamps() + .with_timestamp_assigner(MyTimestampAssigner())) + + table = self.t_env.from_data_stream(ds, + Schema.new_builder() + .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") + .watermark("rowtime", "SOURCE_WATERMARK()") + .build()) + self.assertEqual("""( + `a` BIGINT, + `b` INT, + `c` STRING, + `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA, + WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK() +)""", + table._j_table.getResolvedSchema().toString()) + self.t_env.create_temporary_view("t", + ds, + Schema.new_builder() + .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") + .watermark("rowtime", "SOURCE_WATERMARK()") + .build()) + + result = self.t_env.execute_sql("SELECT " + "c, SUM(b) " + "FROM t " + "GROUP BY c, TUMBLE(rowtime, INTERVAL '0.005' SECOND)") + with result.collect() as result: + collected_result = [str(item) for item in result] + expected_result = [item for item in + map(str, [Row('a', 47), Row('c', 1000), Row('c', 1000)])] + expected_result.sort() + collected_result.sort() + self.assertEqual(expected_result, collected_result) + + ds = self.t_env.to_data_stream(table) + ds.key_by(lambda k: k.c, key_type=Types.STRING()) \ + .window(MyTumblingEventTimeWindow()) \ + .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \ + .add_sink(self.test_sink) + self.env.execute() + expected_results = ['(a,47)', '(c,1000)', '(c,1000)'] + actual_results = self.test_sink.get_results(False) + expected_results.sort() + actual_results.sort() + self.assertEqual(expected_results, actual_results) + def test_to_append_stream(self): self.env.set_parallelism(1) t_env = StreamTableEnvironment.create( @@ -480,3 +603,81 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase): t_env.drop_function("agg_func") t_env.drop_temporary_function("table_func") self.assert_equals(t_env.list_user_defined_functions(), []) + + +class MyTimestampAssigner(TimestampAssigner): + + def extract_timestamp(self, value, record_timestamp) -> int: + return int(value[0]) + + +class MyTumblingEventTimeWindow(MergingWindowAssigner[tuple, TimeWindow]): + + def merge_windows(self, + windows, + callback: 'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None: + window_list = [w for w in windows] + window_list.sort() + for i in range(1, len(window_list)): + if window_list[i - 1].end > window_list[i].start: + callback.merge([window_list[i - 1], window_list[i]], + TimeWindow(window_list[i - 1].start, window_list[i].end)) + + def assign_windows(self, + element: tuple, + timestamp: int, + context): + return [TimeWindow(timestamp, timestamp + 5)] + + def get_default_trigger(self, env) -> Trigger[tuple, TimeWindow]: + return SimpleTimeWindowTrigger() + + def get_window_serializer(self) -> TypeSerializer[TimeWindow]: + return TimeWindowSerializer() + + def is_event_time(self) -> bool: + return True + + +class SimpleTimeWindowTrigger(Trigger[tuple, TimeWindow]): + + def on_element(self, + element: tuple, + timestamp: int, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> TriggerResult: + return TriggerResult.CONTINUE + + def on_processing_time(self, + time: int, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> TriggerResult: + return TriggerResult.CONTINUE + + def on_event_time(self, + time: int, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> TriggerResult: + if time >= window.max_timestamp(): + return TriggerResult.FIRE_AND_PURGE + else: + return TriggerResult.CONTINUE + + def on_merge(self, + window: TimeWindow, + ctx: 'Trigger.OnMergeContext') -> None: + pass + + def clear(self, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> None: + pass + + +class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]): + + def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]): + result = 0 + for i in inputs: + result += i[1] + return [(key, result)] diff --git a/flink-python/pyflink/table/types.py b/flink-python/pyflink/table/types.py index d54dc52..a38c536 100644 --- a/flink-python/pyflink/table/types.py +++ b/flink-python/pyflink/table/types.py @@ -1284,6 +1284,16 @@ class RowType(DataType): return _create_row(self.names, values) +class RawType(DataType): + """ + Logical type of pickled byte array type. + """ + + def from_sql_type(self, obj): + import pickle + return pickle.loads(obj) + + class UserDefinedType(DataType): """ User-defined type (UDT). @@ -1878,6 +1888,8 @@ def _from_java_type(j_data_type): else: raise TypeError("Unsupported type: %s, it is recognized as a legacy type." % type_info) + elif is_instance_of(logical_type, gateway.jvm.RawType): + data_type = RawType() else: raise TypeError("Unsupported type: %s, it is not supported yet in current python type" " system" % j_data_type) diff --git a/flink-python/pyflink/table/utils.py b/flink-python/pyflink/table/utils.py index 9f25601..d9ecc3a 100644 --- a/flink-python/pyflink/table/utils.py +++ b/flink-python/pyflink/table/utils.py @@ -21,7 +21,7 @@ from pyflink.common.types import RowKind from pyflink.java_gateway import get_gateway from pyflink.table.types import DataType, LocalZonedTimestampType, Row, RowType, \ - TimeType, DateType, ArrayType, MapType, TimestampType, FloatType + TimeType, DateType, ArrayType, MapType, TimestampType, FloatType, RawType from pyflink.util.java_utils import to_jarray import datetime import pickle @@ -140,5 +140,7 @@ def pickled_bytes_to_python_converter(data, field_type: DataType): for element_bytes in data: elements.append(pickled_bytes_to_python_converter(element_bytes, element_type)) return elements + elif isinstance(field_type, RawType): + return field_type.from_sql_type(data) else: return field_type.from_sql_type(data) diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java index 9959f53..5cfa257 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeutils.base.CharSerializer; import org.apache.flink.api.common.typeutils.base.DoubleSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; +import org.apache.flink.api.common.typeutils.base.InstantSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -55,11 +56,13 @@ import org.apache.flink.api.java.typeutils.runtime.RowSerializer; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.fnexecution.v1.FlinkFnApi; import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo; +import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo; import org.apache.flink.table.runtime.typeutils.serializers.python.BigDecSerializer; import org.apache.flink.table.runtime.typeutils.serializers.python.DateSerializer; import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer; import org.apache.flink.table.runtime.typeutils.serializers.python.TimeSerializer; import org.apache.flink.table.runtime.typeutils.serializers.python.TimestampSerializer; +import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter; import org.apache.flink.shaded.guava30.com.google.common.collect.Sets; @@ -87,7 +90,8 @@ public class PythonTypeUtils { FlinkFnApi.TypeInfo.TypeName.DOUBLE, FlinkFnApi.TypeInfo.TypeName.CHAR, FlinkFnApi.TypeInfo.TypeName.BIG_INT, - FlinkFnApi.TypeInfo.TypeName.BIG_DEC); + FlinkFnApi.TypeInfo.TypeName.BIG_DEC, + FlinkFnApi.TypeInfo.TypeName.INSTANT); private static final Set<FlinkFnApi.TypeInfo.TypeName> primitiveArrayElementTypeNames = Sets.newHashSet( @@ -162,6 +166,12 @@ public class PythonTypeUtils { return buildListTypeProto((ListTypeInfo<?>) typeInformation); } + if (typeInformation instanceof ExternalTypeInfo) { + return toTypeInfoProto( + LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo( + ((ExternalTypeInfo<?>) typeInformation).getDataType())); + } + throw new UnsupportedOperationException( String.format( "The type information: %s is not supported in PyFlink currently.", @@ -336,6 +346,10 @@ public class PythonTypeUtils { return FlinkFnApi.TypeInfo.TypeName.BIG_DEC; } + if (typeInfo.equals(BasicTypeInfo.INSTANT_TYPE_INFO)) { + return FlinkFnApi.TypeInfo.TypeName.INSTANT; + } + if (typeInfo instanceof PrimitiveArrayTypeInfo) { return FlinkFnApi.TypeInfo.TypeName.PRIMITIVE_ARRAY; } @@ -413,6 +427,8 @@ public class PythonTypeUtils { BasicTypeInfo.BIG_DEC_TYPE_INFO.getTypeClass(), BigDecSerializer.INSTANCE); typeInfoToSerializerMap.put( BasicTypeInfo.BYTE_TYPE_INFO.getTypeClass(), ByteSerializer.INSTANCE); + typeInfoToSerializerMap.put( + BasicTypeInfo.INSTANT_TYPE_INFO.getTypeClass(), InstantSerializer.INSTANCE); typeInfoToSerializerMap.put( PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO.getTypeClass(), @@ -524,6 +540,13 @@ public class PythonTypeUtils { ((ListTypeInfo<?>) typeInformation) .getElementTypeInfo())); } + + if (typeInformation instanceof ExternalTypeInfo) { + return (TypeSerializer<T>) + typeInfoSerializerConverter( + LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo( + ((ExternalTypeInfo<?>) typeInformation).getDataType())); + } } throw new UnsupportedOperationException(
