This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a1c567cab145e410030f352de6581ece55f1089
Author: huangxingbo <[email protected]>
AuthorDate: Thu Jul 22 10:17:52 2021 +0800

    [FLINK-22911][python] Rework 
from_data_stream/create_temporary_view/to_data_stream of TableEnvironment in 
Python Table API
    
    This closes #16611.
---
 flink-python/pyflink/common/__init__.py            |   5 +-
 flink-python/pyflink/common/time.py                |  32 +++
 flink-python/pyflink/common/typeinfo.py            |  65 +++++
 flink-python/pyflink/common/types.py               |  10 +
 flink-python/pyflink/datastream/data_stream.py     |   2 +-
 .../pyflink/fn_execution/coder_impl_fast.pxd       |   6 +
 .../pyflink/fn_execution/coder_impl_fast.pyx       |  29 +++
 .../pyflink/fn_execution/coder_impl_slow.py        |  26 ++
 flink-python/pyflink/fn_execution/coders.py        |  19 +-
 .../pyflink/fn_execution/flink_fn_execution_pb2.py |  54 ++--
 .../pyflink/fn_execution/tests/test_coders.py      |   8 +-
 .../pyflink/proto/flink-fn-execution.proto         |   1 +
 flink-python/pyflink/table/table_environment.py    | 288 ++++++++++++++++++---
 .../table/tests/test_table_environment_api.py      | 211 ++++++++++++++-
 flink-python/pyflink/table/types.py                |  12 +
 flink-python/pyflink/table/utils.py                |   4 +-
 .../flink/streaming/api/utils/PythonTypeUtils.java |  25 +-
 17 files changed, 726 insertions(+), 71 deletions(-)

diff --git a/flink-python/pyflink/common/__init__.py 
b/flink-python/pyflink/common/__init__.py
index daeec0e..d60c5e3 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/__init__.py
@@ -35,7 +35,7 @@ from pyflink.common.job_status import JobStatus
 from pyflink.common.restart_strategy import RestartStrategies, 
RestartStrategyConfiguration
 from pyflink.common.typeinfo import Types, TypeInformation
 from pyflink.common.types import Row, RowKind
-from pyflink.common.time import Duration
+from pyflink.common.time import Duration, Instant
 from pyflink.common.watermark_strategy import WatermarkStrategy
 
 __all__ = [
@@ -57,5 +57,6 @@ __all__ = [
     "Duration",
     "WatermarkStrategy",
     "Types",
-    "TypeInformation"
+    "TypeInformation",
+    "Instant",
 ]
diff --git a/flink-python/pyflink/common/time.py 
b/flink-python/pyflink/common/time.py
index 9e2324d..41e4f0f 100644
--- a/flink-python/pyflink/common/time.py
+++ b/flink-python/pyflink/common/time.py
@@ -17,6 +17,8 @@
 
################################################################################
 from pyflink.java_gateway import get_gateway
 
+__all__ = ['Duration', 'Instant']
+
 
 class Duration(object):
     """
@@ -51,3 +53,33 @@ class Duration(object):
 
     def __eq__(self, other):
         return isinstance(other, Duration) and 
self._j_duration.equals(other._j_duration)
+
+
+class Instant(object):
+    """
+    An instantaneous point on the time-line. Similar to Java.time.Instant.
+    """
+
+    def __init__(self, seconds, nanos):
+        self.seconds = seconds
+        self.nanos = nanos
+
+    def to_epoch_milli(self):
+        if self.seconds < 0 < self.nanos:
+            return (self.seconds + 1) * 1000 + self.nanos // 1000_1000 - 1000
+        else:
+            return self.seconds * 1000 + self.nanos // 1000_000
+
+    @staticmethod
+    def of_epoch_milli(epoch_milli: int) -> 'Instant':
+        secs = epoch_milli // 1000
+        mos = epoch_milli % 1000
+        return Instant(secs, mos * 1000_000)
+
+    def __eq__(self, other):
+        return (self.__class__ == other.__class__ and
+                self.seconds == other.seconds and
+                self.nanos == other.nanos)
+
+    def __repr__(self):
+        return 'Instant<{}, {}>'.format(self.seconds, self.nanos)
diff --git a/flink-python/pyflink/common/typeinfo.py 
b/flink-python/pyflink/common/typeinfo.py
index 5671ac9..66f7d1d 100644
--- a/flink-python/pyflink/common/typeinfo.py
+++ b/flink-python/pyflink/common/typeinfo.py
@@ -89,6 +89,7 @@ class BasicType(Enum):
     CHAR = "Char"
     BIG_INT = "BigInteger"
     BIG_DEC = "BigDecimal"
+    INSTANT = "Instant"
 
 
 class BasicTypeInfo(TypeInformation):
@@ -126,6 +127,8 @@ class BasicTypeInfo(TypeInformation):
                 self._j_typeinfo = JBasicTypeInfo.BIG_INT_TYPE_INFO
             elif self._basic_type == BasicType.BIG_DEC:
                 self._j_typeinfo = JBasicTypeInfo.BIG_DEC_TYPE_INFO
+            elif self._basic_type == BasicType.INSTANT:
+                self._j_typeinfo = JBasicTypeInfo.INSTANT_TYPE_INFO
             else:
                 raise TypeError("Invalid BasicType %s." % self._basic_type)
         return self._j_typeinfo
@@ -182,6 +185,28 @@ class BasicTypeInfo(TypeInformation):
     def BIG_DEC_TYPE_INFO():
         return BasicTypeInfo(BasicType.BIG_DEC)
 
+    @staticmethod
+    def INSTANT_TYPE_INFO():
+        return InstantTypeInfo(BasicType.INSTANT)
+
+
+class InstantTypeInfo(BasicTypeInfo):
+    """
+    InstantTypeInfo enables users to get Instant TypeInfo.
+    """
+    def __init__(self, basic_type: BasicType):
+        super(InstantTypeInfo, self).__init__(basic_type)
+
+    def need_conversion(self):
+        return True
+
+    def to_internal_type(self, obj):
+        return obj.to_epoch_milli() * 1000
+
+    def from_internal_type(self, obj):
+        from pyflink.common.time import Instant
+        return Instant.of_epoch_milli(obj // 1000)
+
 
 class SqlTimeTypeInfo(TypeInformation):
     """
@@ -642,6 +667,30 @@ class MapTypeInfo(TypeInformation):
         return 'MapTypeInfo<{}, {}>'.format(self._key_type_info, 
self._value_type_info)
 
 
+class ExternalTypeInfo(TypeInformation):
+    def __init__(self, type_info: TypeInformation):
+        super(ExternalTypeInfo, self).__init__()
+        self._type_info = type_info
+
+    def get_java_type_info(self) -> JavaObject:
+        if not self._j_typeinfo:
+            gateway = get_gateway()
+            TypeInfoDataTypeConverter = \
+                
gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
+            JExternalTypeInfo = \
+                
gateway.jvm.org.apache.flink.table.runtime.typeutils.ExternalTypeInfo
+
+            j_data_type = 
TypeInfoDataTypeConverter.toDataType(self._type_info.get_java_type_info())
+            self._j_typeinfo = JExternalTypeInfo.of(j_data_type)
+        return self._j_typeinfo
+
+    def __eq__(self, other):
+        return self.__class__ == other.__class__ and self._type_info == 
other._type_info
+
+    def __repr__(self):
+        return 'ExternalTypeInfo<{}>'.format(self._type_info)
+
+
 class Types(object):
     """
     This class gives access to the type information of the most common types 
for which Flink has
@@ -726,6 +775,13 @@ class Types(object):
         return BasicTypeInfo.BIG_DEC_TYPE_INFO()
 
     @staticmethod
+    def INSTANT() -> TypeInformation:
+        """
+        Returns type information for Instant. Supports a None value.
+        """
+        return BasicTypeInfo.INSTANT_TYPE_INFO()
+
+    @staticmethod
     def SQL_DATE() -> TypeInformation:
         """
         Returns type information for Date. Supports a None value.
@@ -863,6 +919,8 @@ def _from_java_type(j_type_info: JavaObject) -> 
TypeInformation:
         return Types.BIG_INT()
     elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_DEC_TYPE_INFO):
         return Types.BIG_DEC()
+    elif _is_instance_of(j_type_info, JBasicTypeInfo.INSTANT_TYPE_INFO):
+        return Types.INSTANT()
 
     JSqlTimeTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
     if _is_instance_of(j_type_info, JSqlTimeTypeInfo.DATE):
@@ -951,6 +1009,13 @@ def _from_java_type(j_type_info: JavaObject) -> 
TypeInformation:
         j_element_type_info = j_type_info.getElementTypeInfo()
         return ListTypeInfo(_from_java_type(j_element_type_info))
 
+    JExternalTypeInfo = 
gateway.jvm.org.apache.flink.table.runtime.typeutils.ExternalTypeInfo
+    if _is_instance_of(j_type_info, JExternalTypeInfo):
+        TypeInfoDataTypeConverter = \
+            
gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
+        return ExternalTypeInfo(_from_java_type(
+            
TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType())))
+
     raise TypeError("The java type info: %s is not supported in PyFlink 
currently." % j_type_info)
 
 
diff --git a/flink-python/pyflink/common/types.py 
b/flink-python/pyflink/common/types.py
index 608a301..19187af 100644
--- a/flink-python/pyflink/common/types.py
+++ b/flink-python/pyflink/common/types.py
@@ -29,6 +29,16 @@ class RowKind(Enum):
     UPDATE_AFTER = 2
     DELETE = 3
 
+    def __str__(self):
+        if self.value == RowKind.INSERT.value:
+            return '+I'
+        elif self.value == RowKind.UPDATE_BEFORE.value:
+            return '-U'
+        elif self.value == RowKind.UPDATE_AFTER.value:
+            return '+U'
+        else:
+            return '-D'
+
 
 def _create_row(fields, values, row_kind: RowKind = None):
     row = Row(*values)
diff --git a/flink-python/pyflink/datastream/data_stream.py 
b/flink-python/pyflink/datastream/data_stream.py
index 325eb59..04bbf7e 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -41,7 +41,7 @@ from pyflink.datastream.utils import convert_to_python_obj
 from pyflink.java_gateway import get_gateway
 
 
-__all__ = ['CloseableIterator']
+__all__ = ['CloseableIterator', 'DataStream']
 
 
 class DataStream(object):
diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd 
b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd
index a6de3fc..de6c373 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd
+++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd
@@ -17,6 +17,8 @@
 
################################################################################
 # cython: language_level=3
 
+from libc.stdint cimport int32_t, int64_t
+
 from pyflink.fn_execution.stream_fast cimport LengthPrefixInputStream, 
LengthPrefixOutputStream, \
     InputStream, OutputStream
 
@@ -136,6 +138,10 @@ cdef class TimestampCoderImpl(FieldCoderImpl):
 cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
     cdef object _timezone
 
+cdef class InstantCoderImpl(FieldCoderImpl):
+    cdef int64_t _null_seconds
+    cdef int32_t _null_nanos
+
 cdef class CloudPickleCoderImpl(FieldCoderImpl):
     pass
 
diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx 
b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
index 719756f..7bf6f64 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
+++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
@@ -30,6 +30,7 @@ from typing import List, Union
 from cloudpickle import cloudpickle
 
 from pyflink.common import Row, RowKind
+from pyflink.common.time import Instant
 from pyflink.datastream.window import CountWindow, TimeWindow
 
 ROW_KIND_BIT_SIZE = 2
@@ -646,6 +647,34 @@ cdef class 
LocalZonedTimestampCoderImpl(TimestampCoderImpl):
     cpdef decode_from_stream(self, InputStream in_stream, size_t size):
         return 
self._timezone.localize(self._decode_timestamp_data_from_stream(in_stream))
 
+cdef class InstantCoderImpl(FieldCoderImpl):
+    """
+    A coder for Instant.
+    """
+
+    def __init__(self):
+        self._null_seconds = -9223372036854775808
+        self._null_nanos = -2147483648
+
+    cpdef encode_to_stream(self, value, OutputStream out_stream):
+        if value is None:
+            out_stream.write_int64(self._null_seconds)
+            out_stream.write_int32(self._null_nanos)
+        else:
+            out_stream.write_int64(value.seconds)
+            out_stream.write_int32(value.nanos)
+
+    cpdef decode_from_stream(self, InputStream in_stream, size_t size):
+        cdef int64_t seconds
+        cdef int32_t nanos
+        seconds = in_stream.read_int64()
+        nanos = in_stream.read_int32()
+        if seconds == self._null_seconds and nanos == self._null_nanos:
+            return None
+        else:
+            return Instant(seconds, nanos)
+
+
 cdef class CloudPickleCoderImpl(FieldCoderImpl):
     """
     A coder used with cloudpickle for all kinds of python object.
diff --git a/flink-python/pyflink/fn_execution/coder_impl_slow.py 
b/flink-python/pyflink/fn_execution/coder_impl_slow.py
index 22bb995..b9789b3 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_slow.py
+++ b/flink-python/pyflink/fn_execution/coder_impl_slow.py
@@ -25,6 +25,7 @@ import cloudpickle
 import pyarrow as pa
 
 from pyflink.common import Row, RowKind
+from pyflink.common.time import Instant
 from pyflink.datastream.window import TimeWindow, CountWindow
 from pyflink.fn_execution.ResettableIO import ResettableIO
 from pyflink.fn_execution.stream_slow import InputStream, OutputStream
@@ -589,6 +590,31 @@ class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
                 milliseconds, nanoseconds))
 
 
+class InstantCoderImpl(FieldCoderImpl):
+    """
+    A coder for Instant.
+    """
+    def __init__(self):
+        self._null_seconds = -9223372036854775808
+        self._null_nanos = -2147483648
+
+    def encode_to_stream(self, value: Instant, out_stream: OutputStream):
+        if value is None:
+            out_stream.write_int64(self._null_seconds)
+            out_stream.write_int32(self._null_nanos)
+        else:
+            out_stream.write_int64(value.seconds)
+            out_stream.write_int32(value.nanos)
+
+    def decode_from_stream(self, in_stream: InputStream, length: int = 0):
+        seconds = in_stream.read_int64()
+        nanos = in_stream.read_int32()
+        if seconds == self._null_seconds and nanos == self._null_nanos:
+            return None
+        else:
+            return Instant(seconds, nanos)
+
+
 class CloudPickleCoderImpl(FieldCoderImpl):
     """
     A coder used with cloudpickle for all kinds of python object.
diff --git a/flink-python/pyflink/fn_execution/coders.py 
b/flink-python/pyflink/fn_execution/coders.py
index 42502bb..7ba4aed 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -24,7 +24,8 @@ import pytz
 
 from pyflink.common.typeinfo import TypeInformation, BasicTypeInfo, BasicType, 
DateTypeInfo, \
     TimeTypeInfo, TimestampTypeInfo, PrimitiveArrayTypeInfo, 
BasicArrayTypeInfo, TupleTypeInfo, \
-    MapTypeInfo, ListTypeInfo, RowTypeInfo, PickledBytesTypeInfo, 
ObjectArrayTypeInfo
+    MapTypeInfo, ListTypeInfo, RowTypeInfo, PickledBytesTypeInfo, 
ObjectArrayTypeInfo, \
+    ExternalTypeInfo
 from pyflink.fn_execution import flink_fn_execution_pb2
 from pyflink.table.types import TinyIntType, SmallIntType, IntType, 
BigIntType, BooleanType, \
     FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, 
TimeType, \
@@ -37,7 +38,7 @@ except:
 
 __all__ = ['FlattenRowCoder', 'RowCoder', 'BigIntCoder', 'TinyIntCoder', 
'BooleanCoder',
            'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder', 
'BinaryCoder', 'CharCoder',
-           'DateCoder', 'TimeCoder', 'TimestampCoder', 
'LocalZonedTimestampCoder',
+           'DateCoder', 'TimeCoder', 'TimestampCoder', 
'LocalZonedTimestampCoder', 'InstantCoder',
            'GenericArrayCoder', 'PrimitiveArrayCoder', 'MapCoder', 
'DecimalCoder',
            'BigDecimalCoder', 'TupleCoder', 'TimeWindowCoder', 
'CountWindowCoder',
            'PickleCoder', 'CloudPickleCoder', 'DataViewFilterCoder']
@@ -531,6 +532,14 @@ class LocalZonedTimestampCoder(FieldCoder):
                 self.timezone == other.timezone)
 
 
+class InstantCoder(FieldCoder):
+    """
+    Coder for Instant.
+    """
+    def get_impl(self) -> coder_impl.FieldCoderImpl:
+        return coder_impl.InstantCoderImpl()
+
+
 class CloudPickleCoder(FieldCoder):
     """
     Coder used with cloudpickle to encode python object.
@@ -665,7 +674,8 @@ _type_info_name_mappings = {
     type_info_name.SQL_DATE: DateCoder(),
     type_info_name.SQL_TIME: TimeCoder(),
     type_info_name.SQL_TIMESTAMP: TimestampCoder(3),
-    type_info_name.PICKLED_BYTES: CloudPickleCoder()
+    type_info_name.PICKLED_BYTES: CloudPickleCoder(),
+    type_info_name.INSTANT: InstantCoder()
 }
 
 
@@ -708,6 +718,7 @@ _basic_type_info_mappings = {
     BasicType.STRING: CharCoder(),
     BasicType.CHAR: CharCoder(),
     BasicType.BIG_DEC: BigDecimalCoder(),
+    BasicType.INSTANT: InstantCoder()
 }
 
 
@@ -746,5 +757,7 @@ def from_type_info(type_info: TypeInformation) -> 
FieldCoder:
         return RowCoder(
             [from_type_info(f) for f in type_info.get_field_types()],
             [f for f in type_info.get_field_names()])
+    elif isinstance(type_info, ExternalTypeInfo):
+        return from_type_info(type_info._type_info)
     else:
         raise ValueError("Unsupported type_info %s." % type_info)
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py 
b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index e895969..6b1fd05 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='flink-fn-execution.proto',
   package='org.apache.flink.fn_execution.v1',
   syntax='proto3',
-  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
+  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
 )
 
 
@@ -335,11 +335,15 @@ _TYPEINFO_TYPENAME = _descriptor.EnumDescriptor(
       name='OBJECT_ARRAY', index=21, number=21,
       options=None,
       type=None),
+    _descriptor.EnumValueDescriptor(
+      name='INSTANT', index=22, number=22,
+      options=None,
+      type=None),
   ],
   containing_type=None,
   options=None,
   serialized_start=5660,
-  serialized_end=5955,
+  serialized_end=5968,
 )
 _sym_db.RegisterEnumDescriptor(_TYPEINFO_TYPENAME)
 
@@ -380,8 +384,8 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE = 
_descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=6727,
-  serialized_end=6860,
+  serialized_start=6740,
+  serialized_end=6873,
 )
 _sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE)
 
@@ -402,8 +406,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=7827,
-  serialized_end=7859,
+  serialized_start=7840,
+  serialized_end=7872,
 )
 _sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE)
 
@@ -1718,7 +1722,7 @@ _TYPEINFO = _descriptor.Descriptor(
       index=0, containing_type=None, fields=[]),
   ],
   serialized_start=4838,
-  serialized_end=5968,
+  serialized_end=5981,
 )
 
 
@@ -1755,8 +1759,8 @@ _USERDEFINEDDATASTREAMFUNCTION_JOBPARAMETER = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6343,
-  serialized_end=6385,
+  serialized_start=6356,
+  serialized_end=6398,
 )
 
 _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = _descriptor.Descriptor(
@@ -1834,8 +1838,8 @@ _USERDEFINEDDATASTREAMFUNCTION_RUNTIMECONTEXT = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=6388,
-  serialized_end=6724,
+  serialized_start=6401,
+  serialized_end=6737,
 )
 
 _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
@@ -1900,8 +1904,8 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=5971,
-  serialized_end=6860,
+  serialized_start=5984,
+  serialized_end=6873,
 )
 
 
@@ -1931,8 +1935,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7456,
-  serialized_end=7530,
+  serialized_start=7469,
+  serialized_end=7543,
 )
 
 _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
@@ -1961,8 +1965,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7532,
-  serialized_end=7599,
+  serialized_start=7545,
+  serialized_end=7612,
 )
 
 _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
@@ -1991,8 +1995,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7601,
-  serialized_end=7670,
+  serialized_start=7614,
+  serialized_end=7683,
 )
 
 _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
@@ -2021,8 +2025,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7672,
-  serialized_end=7751,
+  serialized_start=7685,
+  serialized_end=7764,
 )
 
 _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
@@ -2051,8 +2055,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7753,
-  serialized_end=7825,
+  serialized_start=7766,
+  serialized_end=7838,
 )
 
 _CODERINFODESCRIPTOR = _descriptor.Descriptor(
@@ -2127,8 +2131,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor(
       name='data_type', 
full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type',
       index=0, containing_type=None, fields=[]),
   ],
-  serialized_start=6863,
-  serialized_end=7872,
+  serialized_start=6876,
+  serialized_end=7885,
 )
 
 _INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py 
b/flink-python/pyflink/fn_execution/tests/test_coders.py
index 1dbf396..e72bfaa 100644
--- a/flink-python/pyflink/fn_execution/tests/test_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_coders.py
@@ -25,7 +25,7 @@ from pyflink.fn_execution.coders import BigIntCoder, 
TinyIntCoder, BooleanCoder,
     SmallIntCoder, IntCoder, FloatCoder, DoubleCoder, BinaryCoder, CharCoder, 
DateCoder, \
     TimeCoder, TimestampCoder, GenericArrayCoder, MapCoder, DecimalCoder, 
FlattenRowCoder,\
     RowCoder, LocalZonedTimestampCoder, BigDecimalCoder, TupleCoder, 
PrimitiveArrayCoder,\
-    TimeWindowCoder, CountWindowCoder
+    TimeWindowCoder, CountWindowCoder, InstantCoder
 from pyflink.datastream.window import TimeWindow, CountWindow
 from pyflink.testing.test_case_utils import PyFlinkTestCase
 
@@ -110,6 +110,12 @@ class CodersTest(PyFlinkTestCase):
         self.check_coder(coder,
                          timezone.localize(datetime.datetime(2019, 9, 10, 18, 
30, 20, 123456)))
 
+    def test_instant_coder(self):
+        from pyflink.common.time import Instant
+
+        coder = InstantCoder()
+        self.check_coder(coder, Instant(100, 2000), None, 
Instant(-9223372036854775808, 0))
+
     def test_array_coder(self):
         element_coder = BigIntCoder()
         coder = GenericArrayCoder(element_coder)
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto 
b/flink-python/pyflink/proto/flink-fn-execution.proto
index 1a6d6b6..d51998b 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -309,6 +309,7 @@ message TypeInfo {
     MAP = 19;
     PICKLED_BYTES = 20;
     OBJECT_ARRAY = 21;
+    INSTANT = 22;
   }
 
   message MapTypeInfo {
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 77504b5..fcb9116 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -33,7 +33,7 @@ from pyflink.common import JobExecutionResult
 from pyflink.java_gateway import get_gateway
 from pyflink.serializers import BatchedSerializer, PickleSerializer
 from pyflink.table import Table, EnvironmentSettings, Expression, 
ExplainDetail, \
-    Module, ModuleEntry, TableSink
+    Module, ModuleEntry, TableSink, Schema
 from pyflink.table.catalog import Catalog
 from pyflink.table.serializers import ArrowSerializer
 from pyflink.table.statement_set import StatementSet
@@ -1171,23 +1171,133 @@ class TableEnvironment(object):
         else:
             self._j_tenv.registerFunction(name, java_function)
 
-    def create_temporary_view(self, view_path: str, table: Table):
+    def create_temporary_view(self,
+                              view_path: str,
+                              table_or_data_stream: Union[Table, DataStream],
+                              *fields_or_schema: Union[str, Expression, 
Schema]):
         """
-        Registers a :class:`~pyflink.table.Table` API object as a temporary 
view similar to SQL
-        temporary views.
+        1. When table_or_data_stream is a :class:`~pyflink.table.Table`:
 
-        Temporary objects can shadow permanent ones. If a permanent object in 
a given path exists,
-        it will be inaccessible in the current session. To make the permanent 
object available
-        again you can drop the corresponding temporary object.
+            Registers a :class:`~pyflink.table.Table` API object as a 
temporary view similar to SQL
+            temporary views.
+
+            Temporary objects can shadow permanent ones. If a permanent object 
in a given path
+            exists, it will be inaccessible in the current session. To make 
the permanent object
+            available again you can drop the corresponding temporary object.
+
+        2. When table_or_data_stream is a 
:class:`~pyflink.datastream.DataStream`:
+
+            2.1 When fields_or_schema is a str or a sequence of 
:class:`~pyflink.table.Expression`:
+
+                Creates a view from the given {@link DataStream} in a given 
path with specified
+                field names. Registered views can be referenced in SQL queries.
+
+                1. Reference input fields by name: All fields in the schema 
definition are
+                referenced by name (and possibly renamed using an alias (as). 
Moreover, we can
+                define proctime and rowtime attributes at arbitrary positions 
using arbitrary names
+                (except those that exist in the result schema). In this mode, 
fields can be
+                reordered and projected out. This mode can be used for any 
input type, including
+                POJOs.
+
+                Example:
+                ::
+
+                    >>> stream = ...
+                    # reorder the fields, rename the original 'f0' field to 
'name' and add
+                    # event-time attribute named 'rowtime'
+
+                    # use str
+                    >>> table_env.create_temporary_view(
+                    ...     "cat.db.myTable",
+                    ...     stream,
+                    ...     "f1, rowtime.rowtime, f0 as 'name'")
+
+                    # or use a sequence of expression
+                    >>> table_env.create_temporary_view(
+                    ...     "cat.db.myTable",
+                    ...     stream,
+                    ...     col("f1"),
+                    ...     col("rowtime").rowtime,
+                    ...     col("f0").alias('name'))
+
+                2. Reference input fields by position: In this mode, fields 
are simply renamed.
+                Event-time attributes can replace the field on their position 
in the input data
+                (if it is of correct type) or be appended at the end. Proctime 
attributes must be
+                appended at the end. This mode can only be used if the input 
type has a defined
+                field order (tuple, case class, Row) and none of the {@code 
fields} references a
+                field of the input type.
+
+                Example:
+                ::
+
+                    >>> stream = ...
+                    # rename the original fields to 'a' and 'b' and extract 
the internally attached
+                    # timestamp into an event-time attribute named 'rowtime'
+
+                    # use str
+                    >>> table_env.create_temporary_view(
+                    ...     "cat.db.myTable", stream, "a, b, rowtime.rowtime")
+
+                    # or use a sequence of expressions
+                    >>> table_env.create_temporary_view(
+                    ...     "cat.db.myTable",
+                    ...     stream,
+                    ...     col("a"),
+                    ...     col("b"),
+                    ...     col("rowtime").rowtime)
+
+                Temporary objects can shadow permanent ones. If a permanent 
object in a given path
+                exists, it will be inaccessible in the current session. To 
make the permanent object
+                available again you can drop the corresponding temporary 
object.
+
+            2.2 When fields_or_schema is a :class:`~pyflink.table.Schema`:
+
+                Creates a view from the given {@link DataStream} in a given 
path. Registered views
+                can be referenced in SQL queries.
+
+                See :func:`from_data_stream` for more information on how a
+                :class:`~pyflink.datastream.DataStream` is translated into a 
table.
+
+                Temporary objects can shadow permanent ones. If a permanent 
object in a given path
+                exists, it will be inaccessible in the current session. To 
make the permanent object
+                available again you can drop the corresponding temporary 
object.
+
+                .. note:: create_temporary_view by providing a Schema (case 
2.) was added from flink
+                    1.14.0.
 
         :param view_path: The path under which the view will be registered. 
See also the
                           :class:`~pyflink.table.TableEnvironment` class 
description for the format
                           of the path.
-        :param table: The view to register.
+        :param table_or_data_stream: The Table or DataStream out of which to 
create the view.
+        :param fields_or_schema: The fields expressions(str) to map original 
fields of the
+                        DataStream to the fields of the View or the customized 
schema for the final
+                        table.
 
         .. versionadded:: 1.10.0
         """
-        self._j_tenv.createTemporaryView(view_path, table._j_table)
+        if isinstance(table_or_data_stream, Table):
+            self._j_tenv.createTemporaryView(view_path, 
table_or_data_stream._j_table)
+        elif len(fields_or_schema) == 0:
+            self._j_tenv.createTemporaryView(view_path, 
table_or_data_stream._j_data_stream)
+        elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], 
str):
+            self._j_tenv.createTemporaryView(
+                view_path,
+                table_or_data_stream._j_data_stream,
+                fields_or_schema[0])
+        elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], 
Schema):
+            self._j_tenv.createTemporaryView(
+                view_path,
+                table_or_data_stream._j_data_stream,
+                fields_or_schema[0]._j_schema)
+        elif (len(fields_or_schema) > 0 and
+              all(isinstance(elem, Expression) for elem in fields_or_schema)):
+            self._j_tenv.createTemporaryView(
+                view_path,
+                table_or_data_stream._j_data_stream,
+                to_expression_jarray(fields_or_schema))
+        else:
+            raise ValueError("Invalid arguments for 'fields': %r" %
+                             ','.join([repr(item) for item in 
fields_or_schema]))
 
     def add_python_file(self, file_path: str):
         """
@@ -1725,27 +1835,115 @@ class StreamTableEnvironment(TableEnvironment):
                     
stream_execution_environment._j_stream_execution_environment)
         return StreamTableEnvironment(j_tenv)
 
-    def from_data_stream(self, data_stream: DataStream, *fields: Union[str, 
Expression]) -> Table:
+    def from_data_stream(self,
+                         data_stream: DataStream,
+                         *fields_or_schema: Union[str, Expression, Schema]) -> 
Table:
         """
-        Converts the given DataStream into a Table with specified field names.
+        1. When fields_or_schema is a str or a sequence of Expression:
+
+            Converts the given DataStream into a Table with specified field 
names.
+
+            There are two modes for mapping original fields to the fields of 
the Table:
+
+                1. Reference input fields by name:
+
+                All fields in the schema definition are referenced by name 
(and possibly renamed
+                using and alias (as). Moreover, we can define proctime and 
rowtime attributes at
+                arbitrary positions using arbitrary names (except those that 
exist in the result
+                schema). In this mode, fields can be reordered and projected 
out. This mode can be
+                used for any input type.
+
+                2. Reference input fields by position:
+
+                In this mode, fields are simply renamed. Event-time attributes 
can replace the field
+                on their position in the input data (if it is of correct type) 
or be appended at the
+                end. Proctime attributes must be appended at the end. This 
mode can only be used if
+                the input type has a defined field order (tuple, case class, 
Row) and none of the
+                fields references a field of the input type.
+
+        2. When fields_or_schema is a Schema:
+
+            Converts the given DataStream into a Table.
+
+            Column names and types of the Table are automatically derived from 
the TypeInformation
+            of the DataStream. If the outermost record's TypeInformation is a 
CompositeType, it will
+            be flattened in the first level. Composite nested fields will not 
be accessible.
 
-        There are two modes for mapping original fields to the fields of the 
Table:
-            1. Reference input fields by name:
-            All fields in the schema definition are referenced by name (and 
possibly renamed using
-            and alias (as). Moreover, we can define proctime and rowtime 
attributes at arbitrary
-            positions using arbitrary names (except those that exist in the 
result schema). In this
-            mode, fields can be reordered and projected out. This mode can be 
used for any input
-            type.
-            2. Reference input fields by position:
-            In this mode, fields are simply renamed. Event-time attributes can 
replace the field on
-            their position in the input data (if it is of correct type) or be 
appended at the end.
-            Proctime attributes must be appended at the end. This mode can 
only be used if the input
-            type has a defined field order (tuple, case class, Row) and none 
of the fields
-            references a field of the input type.
+            Since the DataStream API does not support changelog processing 
natively, this method
+            assumes append-only/insert-only semantics during the 
stream-to-table conversion. Records
+            of class Row must describe RowKind.INSERT changes.
+
+            By default, the stream record's timestamp and watermarks are not 
propagated unless
+            explicitly declared.
+
+            This method allows to declare a Schema for the resulting table. 
The declaration is
+            similar to a {@code CREATE TABLE} DDL in SQL and allows to:
+
+                1. enrich or overwrite automatically derived columns with a 
custom DataType
+                2. reorder columns
+                3. add computed or metadata columns next to the physical 
columns
+                4. access a stream record's timestamp
+                5. declare a watermark strategy or propagate the DataStream 
watermarks
+
+            It is possible to declare a schema without physical/regular 
columns. In this case, those
+            columns will be automatically derived and implicitly put at the 
beginning of the schema
+            declaration.
+
+            The following examples illustrate common schema declarations and 
their semantics:
+
+            Example:
+            ::
+
+                === EXAMPLE 1 ===
+
+                no physical columns defined, they will be derived 
automatically,
+                e.g. BigDecimal becomes DECIMAL(38, 18)
+
+                >>> Schema.new_builder() \
+                ...     .column_by_expression("c1", "f1 + 42") \
+                ...     .column_by_expression("c2", "f1 - 1") \
+                ...     .build()
+
+                equal to: CREATE TABLE (f0 STRING, f1 DECIMAL(38, 18), c1 AS 
f1 + 42, c2 AS f1 - 1)
+
+                === EXAMPLE 2 ===
+
+                physical columns defined, input fields and columns will be 
mapped by name,
+                columns are reordered and their data type overwritten,
+                all columns must be defined to show up in the final table's 
schema
+
+                >>> Schema.new_builder() \
+                ...     .column("f1", "DECIMAL(10, 2)") \
+                ...     .column_by_expression("c", "f1 - 1") \
+                ...     .column("f0", "STRING") \
+                ...     .build()
+
+                equal to: CREATE TABLE (f1 DECIMAL(10, 2), c AS f1 - 1, f0 
STRING)
+
+                === EXAMPLE 3 ===
+
+                timestamp and watermarks can be added from the DataStream API,
+                physical columns will be derived automatically
+
+                >>> Schema.new_builder() \
+                ...     .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)") \
+                ...     .watermark("rowtime", "SOURCE_WATERMARK()") \
+                ...     .build()
+
+                equal to:
+                    CREATE TABLE (
+                        f0 STRING,
+                        f1 DECIMAL(38, 18),
+                        rowtime TIMESTAMP(3) METADATA,
+                        WATERMARK FOR rowtime AS SOURCE_WATERMARK()
+                    )
+
+            .. note:: create_temporary_view by providing a Schema (case 2.) 
was added from flink
+                    1.14.0.
 
         :param data_stream: The datastream to be converted.
-        :param fields: The fields expressions to map original fields of the 
DataStream to the fields
-                       of the Table
+        :param fields_or_schema: The fields expressions to map original fields 
of the DataStream to
+            the fields of the Table or the customized schema for the final 
table.
         :return: The converted Table.
 
         .. versionadded:: 1.12.0
@@ -1753,17 +1951,43 @@ class StreamTableEnvironment(TableEnvironment):
         j_data_stream = data_stream._j_data_stream
         JPythonConfigUtil = 
get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil
         
JPythonConfigUtil.configPythonOperator(j_data_stream.getExecutionEnvironment())
-        if len(fields) == 0:
+        if len(fields_or_schema) == 0:
             return Table(j_table=self._j_tenv.fromDataStream(j_data_stream), 
t_env=self)
-        elif all(isinstance(f, Expression) for f in fields):
+        elif all(isinstance(f, Expression) for f in fields_or_schema):
             return Table(j_table=self._j_tenv.fromDataStream(
-                j_data_stream, to_expression_jarray(fields)), t_env=self)
-        elif len(fields) == 1 and isinstance(fields[0], str):
+                j_data_stream, to_expression_jarray(fields_or_schema)), 
t_env=self)
+        elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], 
str):
             warnings.warn(
                 "Deprecated in 1.12. Use from_data_stream(DataStream, 
*Expression) instead.",
                 DeprecationWarning)
-            return Table(j_table=self._j_tenv.fromDataStream(j_data_stream, 
fields[0]), t_env=self)
-        raise ValueError("Invalid arguments for 'fields': %r" % fields)
+            return Table(j_table=self._j_tenv.fromDataStream(
+                j_data_stream, fields_or_schema[0]), t_env=self)
+        elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], 
Schema):
+            return Table(j_table=self._j_tenv.fromDataStream(
+                j_data_stream, fields_or_schema[0]._j_schema), t_env=self)
+        raise ValueError("Invalid arguments for 'fields': %r" % 
fields_or_schema)
+
+    def to_data_stream(self, table: Table) -> DataStream:
+        """
+        Converts the given Table into a DataStream.
+
+        Since the DataStream API does not support changelog processing 
natively, this method
+        assumes append-only/insert-only semantics during the table-to-stream 
conversion. The records
+        of class Row will always describe RowKind#INSERT changes. Updating 
tables are
+        not supported by this method and will produce an exception.
+
+        Note that the type system of the table ecosystem is richer than the 
one of the DataStream
+        API. The table runtime will make sure to properly serialize the output 
records to the first
+        operator of the DataStream API. Afterwards, the Types semantics of the 
DataStream API
+        need to be considered.
+
+        If the input table contains a single rowtime column, it will be 
propagated into a stream
+        record's timestamp. Watermarks will be propagated as well.
+
+        :param table: The Table to convert.
+        :return: The converted DataStream.
+        """
+        return DataStream(self._j_tenv.toDataStream(table._j_table))
 
     def to_append_stream(self, table: Table, type_info: TypeInformation) -> 
DataStream:
         """
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 1e85084..eaec37d 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -19,10 +19,16 @@ import datetime
 import decimal
 import sys
 from py4j.protocol import Py4JJavaError
+from typing import Iterable
 
-from pyflink.common import RowKind
+from pyflink.common import RowKind, WatermarkStrategy
+from pyflink.common.serializer import TypeSerializer
 from pyflink.common.typeinfo import Types
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import MergingWindowAssigner, TimeWindow, Trigger, 
TriggerResult
+from pyflink.datastream.functions import WindowFunction
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
+from pyflink.datastream.window import TimeWindowSerializer
 from pyflink.java_gateway import get_gateway
 from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, 
EnvironmentSettings, \
     Module, ResultKind, ModuleEntry
@@ -33,9 +39,9 @@ from pyflink.table.table_descriptor import TableDescriptor
 from pyflink.table.types import RowType, Row
 from pyflink.table.udf import udf
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import \
-    PyFlinkBatchTableTestCase, PyFlinkStreamTableTestCase, \
-    _load_specific_flink_module_jars
+from pyflink.testing.test_case_utils import (
+    PyFlinkBatchTableTestCase, PyFlinkStreamTableTestCase, PyFlinkTestCase,
+    _load_specific_flink_module_jars, invoke_java_object_method)
 from pyflink.util.java_utils import get_j_env_configuration
 
 
@@ -228,7 +234,46 @@ class TableEnvironmentTest(object):
         self.assertEqual("fake", table.get_options().get("connector"))
 
 
-class DataStreamConversionTestCases(object):
+class DataStreamConversionTestCases(PyFlinkTestCase):
+
+    def setUp(self) -> None:
+        from pyflink.datastream import StreamExecutionEnvironment
+
+        super(DataStreamConversionTestCases, self).setUp()
+        self.env = StreamExecutionEnvironment.get_execution_environment()
+        self.t_env = StreamTableEnvironment.create(self.env)
+
+        self.env.set_parallelism(2)
+        config = invoke_java_object_method(
+            self.env._j_stream_execution_environment, "getConfiguration")
+        config.setString("akka.ask.timeout", "20 s")
+        self.t_env.get_config().get_configuration().set_string(
+            "python.fn-execution.bundle.size", "1")
+        self.test_sink = DataStreamTestSinkFunction()
+
+    def test_from_data_stream_atomic(self):
+        data_stream = self.env.from_collection([(1,), (2,), (3,), (4,), (5,)])
+        result = self.t_env.from_data_stream(data_stream).execute()
+        self.assertEqual("""(
+  `f0` RAW('[B', '...')
+)""",
+                         result._j_table_result.getResolvedSchema().toString())
+        with result.collect() as result:
+            collected_result = [str(item) for item in result]
+            expected_result = [item for item in map(str, [Row(1), Row(2), 
Row(3), Row(4), Row(5)])]
+            expected_result.sort()
+            collected_result.sort()
+            self.assertEqual(expected_result, collected_result)
+
+    def test_to_data_stream_atomic(self):
+        table = self.t_env.from_elements([(1,), (2,), (3,)], ["a"])
+        ds = self.t_env.to_data_stream(table)
+        ds.add_sink(self.test_sink)
+        self.env.execute()
+        results = self.test_sink.get_results(False)
+        results.sort()
+        expected = ['+I[1]', '+I[2]', '+I[3]']
+        self.assertEqual(expected, results)
 
     def test_from_data_stream(self):
         self.env.set_parallelism(1)
@@ -258,6 +303,84 @@ class DataStreamConversionTestCases(object):
         result = source_sink_utils.results()
         self.assert_equals(result, expected)
 
+    def test_from_data_stream_with_schema(self):
+        from pyflink.table import Schema
+
+        ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')],
+                                      type_info=Types.ROW_NAMED(
+                                          ["a", "b", "c"],
+                                          [Types.INT(), Types.STRING(), 
Types.STRING()]))
+
+        table = self.t_env.from_data_stream(ds,
+                                            Schema.new_builder()
+                                                  .column("a", DataTypes.INT())
+                                                  .column("b", 
DataTypes.STRING())
+                                                  .column("c", 
DataTypes.STRING())
+                                                  .build())
+        result = table.execute()
+        with result.collect() as result:
+            collected_result = [str(item) for item in result]
+            expected_result = [item for item in
+                               map(str, [Row(1, 'Hi', 'Hello'), Row(2, 
'Hello', 'Hi')])]
+            expected_result.sort()
+            collected_result.sort()
+            self.assertEqual(expected_result, collected_result)
+
+    def test_from_and_to_data_stream_event_time(self):
+        from pyflink.table import Schema
+
+        ds = self.env.from_collection([(1, 42, "a"), (2, 5, "a"), (3, 1000, 
"c"), (100, 1000, "c")],
+                                      Types.ROW_NAMED(
+                                          ["a", "b", "c"],
+                                          [Types.LONG(), Types.INT(), 
Types.STRING()]))
+        ds = ds.assign_timestamps_and_watermarks(
+            WatermarkStrategy.for_monotonous_timestamps()
+            .with_timestamp_assigner(MyTimestampAssigner()))
+
+        table = self.t_env.from_data_stream(ds,
+                                            Schema.new_builder()
+                                                  
.column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)")
+                                                  .watermark("rowtime", 
"SOURCE_WATERMARK()")
+                                                  .build())
+        self.assertEqual("""(
+  `a` BIGINT,
+  `b` INT,
+  `c` STRING,
+  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
+  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
+)""",
+                         table._j_table.getResolvedSchema().toString())
+        self.t_env.create_temporary_view("t",
+                                         ds,
+                                         Schema.new_builder()
+                                         .column_by_metadata("rowtime", 
"TIMESTAMP_LTZ(3)")
+                                         .watermark("rowtime", 
"SOURCE_WATERMARK()")
+                                         .build())
+
+        result = self.t_env.execute_sql("SELECT "
+                                        "c, SUM(b) "
+                                        "FROM t "
+                                        "GROUP BY c, TUMBLE(rowtime, INTERVAL 
'0.005' SECOND)")
+        with result.collect() as result:
+            collected_result = [str(item) for item in result]
+            expected_result = [item for item in
+                               map(str, [Row('a', 47), Row('c', 1000), 
Row('c', 1000)])]
+            expected_result.sort()
+            collected_result.sort()
+            self.assertEqual(expected_result, collected_result)
+
+        ds = self.t_env.to_data_stream(table)
+        ds.key_by(lambda k: k.c, key_type=Types.STRING()) \
+            .window(MyTumblingEventTimeWindow()) \
+            .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), 
Types.INT()])) \
+            .add_sink(self.test_sink)
+        self.env.execute()
+        expected_results = ['(a,47)', '(c,1000)', '(c,1000)']
+        actual_results = self.test_sink.get_results(False)
+        expected_results.sort()
+        actual_results.sort()
+        self.assertEqual(expected_results, actual_results)
+
     def test_to_append_stream(self):
         self.env.set_parallelism(1)
         t_env = StreamTableEnvironment.create(
@@ -480,3 +603,81 @@ class 
BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         t_env.drop_function("agg_func")
         t_env.drop_temporary_function("table_func")
         self.assert_equals(t_env.list_user_defined_functions(), [])
+
+
+class MyTimestampAssigner(TimestampAssigner):
+
+    def extract_timestamp(self, value, record_timestamp) -> int:
+        return int(value[0])
+
+
+class MyTumblingEventTimeWindow(MergingWindowAssigner[tuple, TimeWindow]):
+
+    def merge_windows(self,
+                      windows,
+                      callback: 
'MergingWindowAssigner.MergeCallback[TimeWindow]') -> None:
+        window_list = [w for w in windows]
+        window_list.sort()
+        for i in range(1, len(window_list)):
+            if window_list[i - 1].end > window_list[i].start:
+                callback.merge([window_list[i - 1], window_list[i]],
+                               TimeWindow(window_list[i - 1].start, 
window_list[i].end))
+
+    def assign_windows(self,
+                       element: tuple,
+                       timestamp: int,
+                       context):
+        return [TimeWindow(timestamp, timestamp + 5)]
+
+    def get_default_trigger(self, env) -> Trigger[tuple, TimeWindow]:
+        return SimpleTimeWindowTrigger()
+
+    def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
+        return TimeWindowSerializer()
+
+    def is_event_time(self) -> bool:
+        return True
+
+
+class SimpleTimeWindowTrigger(Trigger[tuple, TimeWindow]):
+
+    def on_element(self,
+                   element: tuple,
+                   timestamp: int,
+                   window: TimeWindow,
+                   ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        return TriggerResult.CONTINUE
+
+    def on_processing_time(self,
+                           time: int,
+                           window: TimeWindow,
+                           ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        return TriggerResult.CONTINUE
+
+    def on_event_time(self,
+                      time: int,
+                      window: TimeWindow,
+                      ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        if time >= window.max_timestamp():
+            return TriggerResult.FIRE_AND_PURGE
+        else:
+            return TriggerResult.CONTINUE
+
+    def on_merge(self,
+                 window: TimeWindow,
+                 ctx: 'Trigger.OnMergeContext') -> None:
+        pass
+
+    def clear(self,
+              window: TimeWindow,
+              ctx: 'Trigger.TriggerContext') -> None:
+        pass
+
+
+class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
+
+    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
+        result = 0
+        for i in inputs:
+            result += i[1]
+        return [(key, result)]
diff --git a/flink-python/pyflink/table/types.py 
b/flink-python/pyflink/table/types.py
index d54dc52..a38c536 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -1284,6 +1284,16 @@ class RowType(DataType):
         return _create_row(self.names, values)
 
 
+class RawType(DataType):
+    """
+    Logical type of pickled byte array type.
+    """
+
+    def from_sql_type(self, obj):
+        import pickle
+        return pickle.loads(obj)
+
+
 class UserDefinedType(DataType):
     """
     User-defined type (UDT).
@@ -1878,6 +1888,8 @@ def _from_java_type(j_data_type):
             else:
                 raise TypeError("Unsupported type: %s, it is recognized as a 
legacy type."
                                 % type_info)
+        elif is_instance_of(logical_type, gateway.jvm.RawType):
+            data_type = RawType()
         else:
             raise TypeError("Unsupported type: %s, it is not supported yet in 
current python type"
                             " system" % j_data_type)
diff --git a/flink-python/pyflink/table/utils.py 
b/flink-python/pyflink/table/utils.py
index 9f25601..d9ecc3a 100644
--- a/flink-python/pyflink/table/utils.py
+++ b/flink-python/pyflink/table/utils.py
@@ -21,7 +21,7 @@ from pyflink.common.types import RowKind
 
 from pyflink.java_gateway import get_gateway
 from pyflink.table.types import DataType, LocalZonedTimestampType, Row, 
RowType, \
-    TimeType, DateType, ArrayType, MapType, TimestampType, FloatType
+    TimeType, DateType, ArrayType, MapType, TimestampType, FloatType, RawType
 from pyflink.util.java_utils import to_jarray
 import datetime
 import pickle
@@ -140,5 +140,7 @@ def pickled_bytes_to_python_converter(data, field_type: 
DataType):
             for element_bytes in data:
                 
elements.append(pickled_bytes_to_python_converter(element_bytes, element_type))
             return elements
+        elif isinstance(field_type, RawType):
+            return field_type.from_sql_type(data)
         else:
             return field_type.from_sql_type(data)
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
index 9959f53..5cfa257 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.api.common.typeutils.base.CharSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+import org.apache.flink.api.common.typeutils.base.InstantSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -55,11 +56,13 @@ import 
org.apache.flink.api.java.typeutils.runtime.RowSerializer;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo;
+import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo;
 import 
org.apache.flink.table.runtime.typeutils.serializers.python.BigDecSerializer;
 import 
org.apache.flink.table.runtime.typeutils.serializers.python.DateSerializer;
 import 
org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
 import 
org.apache.flink.table.runtime.typeutils.serializers.python.TimeSerializer;
 import 
org.apache.flink.table.runtime.typeutils.serializers.python.TimestampSerializer;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
 
@@ -87,7 +90,8 @@ public class PythonTypeUtils {
                         FlinkFnApi.TypeInfo.TypeName.DOUBLE,
                         FlinkFnApi.TypeInfo.TypeName.CHAR,
                         FlinkFnApi.TypeInfo.TypeName.BIG_INT,
-                        FlinkFnApi.TypeInfo.TypeName.BIG_DEC);
+                        FlinkFnApi.TypeInfo.TypeName.BIG_DEC,
+                        FlinkFnApi.TypeInfo.TypeName.INSTANT);
 
         private static final Set<FlinkFnApi.TypeInfo.TypeName> 
primitiveArrayElementTypeNames =
                 Sets.newHashSet(
@@ -162,6 +166,12 @@ public class PythonTypeUtils {
                 return buildListTypeProto((ListTypeInfo<?>) typeInformation);
             }
 
+            if (typeInformation instanceof ExternalTypeInfo) {
+                return toTypeInfoProto(
+                        LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+                                ((ExternalTypeInfo<?>) 
typeInformation).getDataType()));
+            }
+
             throw new UnsupportedOperationException(
                     String.format(
                             "The type information: %s is not supported in 
PyFlink currently.",
@@ -336,6 +346,10 @@ public class PythonTypeUtils {
                 return FlinkFnApi.TypeInfo.TypeName.BIG_DEC;
             }
 
+            if (typeInfo.equals(BasicTypeInfo.INSTANT_TYPE_INFO)) {
+                return FlinkFnApi.TypeInfo.TypeName.INSTANT;
+            }
+
             if (typeInfo instanceof PrimitiveArrayTypeInfo) {
                 return FlinkFnApi.TypeInfo.TypeName.PRIMITIVE_ARRAY;
             }
@@ -413,6 +427,8 @@ public class PythonTypeUtils {
                     BasicTypeInfo.BIG_DEC_TYPE_INFO.getTypeClass(), 
BigDecSerializer.INSTANCE);
             typeInfoToSerializerMap.put(
                     BasicTypeInfo.BYTE_TYPE_INFO.getTypeClass(), 
ByteSerializer.INSTANCE);
+            typeInfoToSerializerMap.put(
+                    BasicTypeInfo.INSTANT_TYPE_INFO.getTypeClass(), 
InstantSerializer.INSTANCE);
 
             typeInfoToSerializerMap.put(
                     
PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO.getTypeClass(),
@@ -524,6 +540,13 @@ public class PythonTypeUtils {
                                             ((ListTypeInfo<?>) typeInformation)
                                                     .getElementTypeInfo()));
                 }
+
+                if (typeInformation instanceof ExternalTypeInfo) {
+                    return (TypeSerializer<T>)
+                            typeInfoSerializerConverter(
+                                    
LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+                                            ((ExternalTypeInfo<?>) 
typeInformation).getDataType()));
+                }
             }
 
             throw new UnsupportedOperationException(

Reply via email to