dianfu commented on a change in pull request #16069:
URL: https://github.com/apache/flink/pull/16069#discussion_r658592925



##########
File path: flink-python/pyflink/fn_execution/coders.py
##########
@@ -635,3 +663,57 @@ def from_type_info_proto(type_info):
             return 
BasicArrayCoder(from_type_info_proto(type_info.collection_element_type))
         else:
             raise ValueError("Unsupported type_info %s." % type_info)
+
+
+_basic_type_info_mappings = {
+    BasicType.BYTE: TinyIntCoder(),
+    BasicType.BOOLEAN: BooleanCoder(),
+    BasicType.SHORT: SmallIntCoder(),
+    BasicType.INT: IntCoder(),
+    BasicType.LONG: BigIntCoder(),
+    BasicType.BIG_INT: BigIntCoder(),
+    BasicType.FLOAT: FloatCoder(),
+    BasicType.DOUBLE: DoubleCoder(),
+    BasicType.STRING: CharCoder(),
+    BasicType.CHAR: CharCoder(),
+    BasicType.BIG_DEC: BigDecimalCoder(),
+}
+
+
+def from_type_info(type_info: TypeInformation) -> FieldCoder:
+    """
+    Mappings from type_info to Coder
+    """
+
+    if isinstance(type_info, PickledBytesTypeInfo):
+        return PickleCoder()
+    elif isinstance(type_info, BasicTypeInfo):
+        return _basic_type_info_mappings[type_info._basic_type]
+    elif isinstance(type_info, DateTypeInfo):
+        return DateCoder()
+    elif isinstance(type_info, TimeTypeInfo):
+        return TimeCoder()
+    elif isinstance(type_info, TimestampTypeInfo):
+        return TimestampCoder(3)
+    elif isinstance(type_info, PrimitiveArrayTypeInfo):
+        element_type = type_info._element_type
+        if isinstance(element_type, BasicTypeInfo) and 
element_type._basic_type == BasicType.BYTE:
+            return BinaryCoder()
+        else:
+            return PrimitiveArrayCoder(from_type_info(element_type))
+    elif isinstance(type_info, BasicArrayTypeInfo):

Review comment:
       Should also consider ObjectArrayTypeInfo

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1136,7 +1136,7 @@ def get_execution_environment(self):
         return self._keyed_stream.get_execution_environment()
 
     def get_input_type(self):
-        return self._keyed_stream.get_type()
+        return 
_from_java_type(self._keyed_stream._original_data_type_info.get_java_type_info())

Review comment:
       What's the purpose of this change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to