dianfu commented on a change in pull request #13327:
URL: https://github.com/apache/flink/pull/13327#discussion_r494116254



##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -384,6 +444,35 @@ def PRIMITIVE_ARRAY(element_type: TypeInformation):
         else:
             raise TypeError("Invalid element type for a primitive array.")
 
+    @staticmethod
+    def BASIC_ARRAY(element_type: TypeInformation):
+        """
+        Returns type information for arrays of boxed primitive type (such as 
Integer[]).
+
+        :param element_type element type of the array (e.g. Types.BOOLEAN(), 
Types.INT(),
+        Types.DOUBLE())
+        """
+        if element_type == Types.BOOLEAN():
+            return BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO()
+        elif element_type == Types.BYTE():
+            return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO()
+        elif element_type == Types.SHORT():
+            return BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO()
+        elif element_type == Types.INT():
+            return BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO()
+        elif element_type == Types.LONG():
+            return BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO()
+        elif element_type == Types.FLOAT():
+            return BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO()
+        elif element_type == Types.DOUBLE():
+            return BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO()
+        elif element_type == Types.CHAR():
+            return BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO()
+        elif element_type == Types.STRING():
+            return BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO()
+        else:
+            raise TypeError("Invalid element type for a boxed primitive 
array.")

Review comment:
       ```suggestion
               raise TypeError("Invalid element type for a boxed primitive 
array: %s" % str(element_type))
   ```

##########
File path: flink-python/pyflink/fn_execution/coders.py
##########
@@ -32,7 +32,7 @@
 __all__ = ['RowCoder', 'BigIntCoder', 'TinyIntCoder', 'BooleanCoder',
            'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder',
            'BinaryCoder', 'CharCoder', 'DateCoder', 'TimeCoder',
-           'TimestampCoder', 'ArrayCoder', 'MapCoder', 'DecimalCoder']
+           'TimestampCoder', 'BasicArrayCoder', 'MapCoder', 'DecimalCoder']

Review comment:
       Also expose PrimitiveArrayCoder?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coders.py
##########
@@ -194,7 +194,7 @@ def _to_data_type(field_type):
                                                field_type.nullable)
             elif field_type.type_name == 
flink_fn_execution_pb2.Schema.TIMESTAMP:
                 return TimestampType(field_type.timestamp_info.precision, 
field_type.nullable)
-            elif field_type.type_name == flink_fn_execution_pb2.Schema.ARRAY:

Review comment:
       should we also add PRIMITIVE_ARRAY?

##########
File path: flink-python/pyflink/fn_execution/coders.py
##########
@@ -467,8 +478,8 @@ def from_proto(field_type):
     if field_type_name == type_name.LOCAL_ZONED_TIMESTAMP:
         timezone = pytz.timezone(os.environ['table.exec.timezone'])
         return 
LocalZonedTimestampCoder(field_type.local_zoned_timestamp_info.precision, 
timezone)
-    elif field_type_name == type_name.ARRAY:
-        return ArrayCoder(from_proto(field_type.collection_element_type))
+    elif field_type_name == type_name.BASIC_ARRAY:

Review comment:
       Should we also support PRIMITIVE_ARRAY here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to