dianfu commented on a change in pull request #14401:
URL: https://github.com/apache/flink/pull/14401#discussion_r546606700
##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -91,296 +73,297 @@ def from_internal_type(self, obj):
return obj
-class BasicTypeInfo(TypeInformation, ABC):
+class BasicType(Enum):
+ STRING = "String"
+ BYTE = "Byte"
+ BOOLEAN = "Boolean"
+ SHORT = "Short"
+ INT = "Integer"
+ LONG = "Long"
+ FLOAT = "Float"
+ DOUBLE = "Double"
+ CHAR = "Char"
+ BIG_INT = "BigInteger"
+ BIG_DEC = "BigDecimal"
+
+
+class BasicTypeInfo(TypeInformation):
"""
Type information for primitive types (int, long, double, byte, ...),
String, BigInteger,
and BigDecimal.
"""
+ def __init__(self, basic_type: BasicType):
+ self._basic_type = basic_type
+ super(BasicTypeInfo, self).__init__()
+
+ def get_java_type_info(self) -> JavaObject:
+ JBasicTypeInfo =
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo
Review comment:
We can return self._j_typeinfo directly if it's not none.
##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -518,15 +514,24 @@ def from_internal_type(self, t):
hours, minutes = divmod(minutes, 60)
return datetime.time(hours, minutes, seconds, microseconds)
+ def get_java_type_info(self) -> JavaObject:
+ if not self._j_typeinfo:
+ self._j_typeinfo = get_gateway().jvm\
+ .org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME
+ return self._j_typeinfo
+
+ def __eq__(self, o: object) -> bool:
+ return isinstance(o, TimeTypeInfo)
+
+ def __repr__(self) -> str:
+ return "TimeTypeInformation"
Review comment:
```suggestion
return "TimeTypeInfo"
```
##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -91,296 +73,297 @@ def from_internal_type(self, obj):
return obj
-class BasicTypeInfo(TypeInformation, ABC):
+class BasicType(Enum):
+ STRING = "String"
+ BYTE = "Byte"
+ BOOLEAN = "Boolean"
+ SHORT = "Short"
+ INT = "Integer"
+ LONG = "Long"
+ FLOAT = "Float"
+ DOUBLE = "Double"
+ CHAR = "Char"
+ BIG_INT = "BigInteger"
+ BIG_DEC = "BigDecimal"
+
+
+class BasicTypeInfo(TypeInformation):
"""
Type information for primitive types (int, long, double, byte, ...),
String, BigInteger,
and BigDecimal.
"""
+ def __init__(self, basic_type: BasicType):
+ self._basic_type = basic_type
+ super(BasicTypeInfo, self).__init__()
+
+ def get_java_type_info(self) -> JavaObject:
+ JBasicTypeInfo =
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo
+ if self._basic_type == BasicType.STRING:
+ self._j_typeinfo = JBasicTypeInfo.STRING_TYPE_INFO
+ elif self._basic_type == BasicType.BYTE:
+ self._j_typeinfo = JBasicTypeInfo.BYTE_TYPE_INFO
+ elif self._basic_type == BasicType.BOOLEAN:
+ self._j_typeinfo = JBasicTypeInfo.BOOLEAN_TYPE_INFO
+ elif self._basic_type == BasicType.SHORT:
+ self._j_typeinfo = JBasicTypeInfo.SHORT_TYPE_INFO
+ elif self._basic_type == BasicType.INT:
+ self._j_typeinfo = JBasicTypeInfo.INT_TYPE_INFO
+ elif self._basic_type == BasicType.LONG:
+ self._j_typeinfo = JBasicTypeInfo.LONG_TYPE_INFO
+ elif self._basic_type == BasicType.FLOAT:
+ self._j_typeinfo = JBasicTypeInfo.FLOAT_TYPE_INFO
+ elif self._basic_type == BasicType.DOUBLE:
+ self._j_typeinfo = JBasicTypeInfo.DOUBLE_TYPE_INFO
+ elif self._basic_type == BasicType.CHAR:
+ self._j_typeinfo = JBasicTypeInfo.CHAR_TYPE_INFO
+ elif self._basic_type == BasicType.BIG_INT:
+ self._j_typeinfo = JBasicTypeInfo.BIG_INT_TYPE_INFO
+ elif self._basic_type == BasicType.BIG_DEC:
+ self._j_typeinfo = JBasicTypeInfo.BIG_DEC_TYPE_INFO
Review comment:
Raise an exception if _basic_type doesn't match any of the above?
##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -484,17 +471,26 @@ def from_internal_type(self, v):
if v is not None:
return datetime.date.fromordinal(v + self.EPOCH_ORDINAL)
+ def get_java_type_info(self) -> JavaObject:
+ if not self._j_typeinfo:
+ self._j_typeinfo = get_gateway().jvm\
+ .org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE
+ return self._j_typeinfo
+
+ def __eq__(self, o: object) -> bool:
+ return isinstance(o, DateTypeInfo)
+
+ def __repr__(self):
+ return "DateTypeInformation"
Review comment:
```suggestion
return "DateTypeInfo"
```
##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -602,57 +668,43 @@ def PRIMITIVE_ARRAY(element_type: TypeInformation):
Returns type information for arrays of primitive type (such as
byte[]). The array must not
be null.
- :param element_type element type of the array (e.g. Types.BOOLEAN(),
Types.INT(),
+ :param element_type: element type of the array (e.g. Types.BOOLEAN(),
Types.INT(),
Types.DOUBLE())
"""
- if element_type == Types.BOOLEAN():
- return PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO()
- elif element_type == Types.BYTE():
- return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO()
- elif element_type == Types.SHORT():
- return PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO()
- elif element_type == Types.INT():
- return PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO()
- elif element_type == Types.LONG():
- return PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO()
- elif element_type == Types.FLOAT():
- return PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO()
- elif element_type == Types.DOUBLE():
- return PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO()
- elif element_type == Types.CHAR():
- return PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()
- else:
- raise TypeError("Invalid element type for a primitive array.")
+ return PrimitiveArrayTypeInfo(element_type)
@staticmethod
def BASIC_ARRAY(element_type: TypeInformation) -> TypeInformation:
"""
Returns type information for arrays of boxed primitive type (such as
Integer[]).
- :param element_type element type of the array (e.g. Types.BOOLEAN(),
Types.INT(),
+ :param element_type: element type of the array (e.g. Types.BOOLEAN(),
Types.INT(),
Types.DOUBLE())
"""
- if element_type == Types.BOOLEAN():
- return BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO()
- elif element_type == Types.BYTE():
- return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO()
- elif element_type == Types.SHORT():
- return BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO()
- elif element_type == Types.INT():
- return BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO()
- elif element_type == Types.LONG():
- return BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO()
- elif element_type == Types.FLOAT():
- return BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO()
- elif element_type == Types.DOUBLE():
- return BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO()
- elif element_type == Types.CHAR():
- return BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO()
- elif element_type == Types.STRING():
- return BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO()
- else:
- raise TypeError("Invalid element type for a boxed primitive array:
%s" %
- str(element_type))
+
+ return BasicArrayTypeInfo(element_type)
+
+ @staticmethod
+ def MAP(key_type_info: TypeInformation, value_type_info: TypeInformation)
-> TypeInformation:
+ """
+ Special TypeInformation used by MapStateDescriptor
+
+ :param key_type_info: Element type of key (e.g. Types.BOOLEAN(),
Types.INT(),
+ Types.DOUBLE())
+ :param value_type_info: Element type of value (e.g. Types.BOOLEAN(),
Types.INT(),
+ Types.DOUBLE())
+ """
+ return MapTypeInfo(key_type_info, value_type_info)
+
+ @staticmethod
+ def LIST(element_type_info: TypeInformation) -> TypeInformation:
+ """
+ A TypeInformation for the list type.
+
+ :param element_type_info: The type of the elements in the list
+ :return:
Review comment:
the return statement could be removed.
##########
File path: flink-python/pyflink/common/typeinfo.py
##########
@@ -91,296 +73,297 @@ def from_internal_type(self, obj):
return obj
-class BasicTypeInfo(TypeInformation, ABC):
+class BasicType(Enum):
+ STRING = "String"
+ BYTE = "Byte"
+ BOOLEAN = "Boolean"
+ SHORT = "Short"
+ INT = "Integer"
+ LONG = "Long"
+ FLOAT = "Float"
+ DOUBLE = "Double"
+ CHAR = "Char"
+ BIG_INT = "BigInteger"
+ BIG_DEC = "BigDecimal"
+
+
+class BasicTypeInfo(TypeInformation):
"""
Type information for primitive types (int, long, double, byte, ...),
String, BigInteger,
and BigDecimal.
"""
+ def __init__(self, basic_type: BasicType):
+ self._basic_type = basic_type
+ super(BasicTypeInfo, self).__init__()
+
+ def get_java_type_info(self) -> JavaObject:
+ JBasicTypeInfo =
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo
+ if self._basic_type == BasicType.STRING:
+ self._j_typeinfo = JBasicTypeInfo.STRING_TYPE_INFO
+ elif self._basic_type == BasicType.BYTE:
+ self._j_typeinfo = JBasicTypeInfo.BYTE_TYPE_INFO
+ elif self._basic_type == BasicType.BOOLEAN:
+ self._j_typeinfo = JBasicTypeInfo.BOOLEAN_TYPE_INFO
+ elif self._basic_type == BasicType.SHORT:
+ self._j_typeinfo = JBasicTypeInfo.SHORT_TYPE_INFO
+ elif self._basic_type == BasicType.INT:
+ self._j_typeinfo = JBasicTypeInfo.INT_TYPE_INFO
+ elif self._basic_type == BasicType.LONG:
+ self._j_typeinfo = JBasicTypeInfo.LONG_TYPE_INFO
+ elif self._basic_type == BasicType.FLOAT:
+ self._j_typeinfo = JBasicTypeInfo.FLOAT_TYPE_INFO
+ elif self._basic_type == BasicType.DOUBLE:
+ self._j_typeinfo = JBasicTypeInfo.DOUBLE_TYPE_INFO
+ elif self._basic_type == BasicType.CHAR:
+ self._j_typeinfo = JBasicTypeInfo.CHAR_TYPE_INFO
+ elif self._basic_type == BasicType.BIG_INT:
+ self._j_typeinfo = JBasicTypeInfo.BIG_INT_TYPE_INFO
+ elif self._basic_type == BasicType.BIG_DEC:
+ self._j_typeinfo = JBasicTypeInfo.BIG_DEC_TYPE_INFO
+ return self._j_typeinfo
+
+ def __eq__(self, o) -> bool:
+ if isinstance(o, BasicTypeInfo):
+ return self._basic_type == o._basic_type
+ return False
+
+ def __repr__(self):
+ return self._basic_type.value
+
@staticmethod
def STRING_TYPE_INFO():
- return WrapperTypeInfo(get_gateway().jvm
-
.org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO)
+ return BasicTypeInfo(BasicType.STRING)
@staticmethod
def BOOLEAN_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO)
+ return BasicTypeInfo(BasicType.BOOLEAN)
@staticmethod
def BYTE_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO)
+ return BasicTypeInfo(BasicType.BYTE)
@staticmethod
def SHORT_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO)
+ return BasicTypeInfo(BasicType.SHORT)
@staticmethod
def INT_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO)
+ return BasicTypeInfo(BasicType.INT)
@staticmethod
def LONG_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO)
+ return BasicTypeInfo(BasicType.LONG)
@staticmethod
def FLOAT_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO)
+ return BasicTypeInfo(BasicType.FLOAT)
@staticmethod
def DOUBLE_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO)
+ return BasicTypeInfo(BasicType.DOUBLE)
@staticmethod
def CHAR_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.CHAR_TYPE_INFO)
+ return BasicTypeInfo(BasicType.CHAR)
@staticmethod
def BIG_INT_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_INT_TYPE_INFO)
+ return BasicTypeInfo(BasicType.BIG_INT)
@staticmethod
def BIG_DEC_TYPE_INFO():
- return WrapperTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO)
+ return BasicTypeInfo(BasicType.BIG_DEC)
-class SqlTimeTypeInfo(TypeInformation, ABC):
+class SqlTimeTypeInfo(TypeInformation):
"""
SqlTimeTypeInfo enables users to get Sql Time TypeInfo.
"""
@staticmethod
def DATE():
- return DateTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.DATE)
+ return DateTypeInfo()
@staticmethod
def TIME():
- return TimeTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIME)
+ return TimeTypeInfo()
@staticmethod
def TIMESTAMP():
- return TimestampTypeInfo(
-
get_gateway().jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP)
+ return TimestampTypeInfo()
-class PrimitiveArrayTypeInfo(WrapperTypeInfo, ABC):
+class PrimitiveArrayTypeInfo(TypeInformation):
"""
A TypeInformation for arrays of primitive types (int, long, double, ...).
Supports the creation of dedicated efficient serializers for these types.
"""
- @staticmethod
- def BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
-
- @staticmethod
- def BYTE_PRIMITIVE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+ def __init__(self, element_type: TypeInformation):
+ self._element_type = element_type
+ super(PrimitiveArrayTypeInfo, self).__init__()
- @staticmethod
- def SHORT_PRIMITIVE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO)
-
- @staticmethod
- def INT_PRIMITIVE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+ def get_java_type_info(self) -> JavaObject:
+ if not self._j_typeinfo:
+ JPrimitiveArrayTypeInfo =
get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+ .PrimitiveArrayTypeInfo
+ if self._element_type == Types.BOOLEAN():
+ self._j_typeinfo =
JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+ elif self._element_type == Types.BYTE():
+ self._j_typeinfo =
JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+ elif self._element_type == Types.SHORT():
+ self._j_typeinfo =
JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+ elif self._element_type == Types.INT():
+ self._j_typeinfo =
JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+ elif self._element_type == Types.LONG():
+ self._j_typeinfo =
JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+ elif self._element_type == Types.FLOAT():
+ self._j_typeinfo =
JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+ elif self._element_type == Types.DOUBLE():
+ self._j_typeinfo =
JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+ elif self._element_type == Types.CHAR():
+ self._j_typeinfo =
JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO
+ else:
+ raise TypeError("Invalid element type for a primitive array.")
+ return self._j_typeinfo
- @staticmethod
- def LONG_PRIMITIVE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+ def __eq__(self, o) -> bool:
+ if isinstance(o, PrimitiveArrayTypeInfo):
+ return self._element_type == o._element_type
+ return False
- @staticmethod
- def FLOAT_PRIMITIVE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO)
+ def __repr__(self) -> str:
+ return "PrimitiveArrayTypeInformation<%s>" % self._element_type
- @staticmethod
- def DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO)
- @staticmethod
- def CHAR_PRIMITIVE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO)
-
-
-def is_primitive_array_type_info(type_info: TypeInformation):
- return type_info in {
- PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO(),
- PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO(),
- PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO(),
- PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO(),
- PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO(),
- PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO(),
- PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO(),
- PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()
- }
-
-
-class BasicArrayTypeInfo(WrapperTypeInfo, ABC):
+class BasicArrayTypeInfo(TypeInformation):
"""
A TypeInformation for arrays of boxed primitive types (Integer, Long,
Double, ...).
Supports the creation of dedicated efficient serializers for these types.
"""
- @staticmethod
- def BOOLEAN_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO)
-
- @staticmethod
- def BYTE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
- @staticmethod
- def SHORT_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO)
+ def __init__(self, element_type: TypeInformation):
+ self._element_type = element_type
+ super(BasicArrayTypeInfo, self).__init__()
- @staticmethod
- def INT_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
+ def get_java_type_info(self) -> JavaObject:
+ if not self._j_typeinfo:
+ JBasicArrayTypeInfo =
get_gateway().jvm.org.apache.flink.api.common.typeinfo \
+ .BasicArrayTypeInfo
+ if self._element_type == Types.BOOLEAN():
+ self._j_typeinfo = JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO
+ elif self._element_type == Types.BYTE():
+ self._j_typeinfo = JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO
+ elif self._element_type == Types.SHORT():
+ self._j_typeinfo = JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO
+ elif self._element_type == Types.INT():
+ self._j_typeinfo = JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+ elif self._element_type == Types.LONG():
+ self._j_typeinfo = JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+ elif self._element_type == Types.FLOAT():
+ self._j_typeinfo = JBasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO
+ elif self._element_type == Types.DOUBLE():
+ self._j_typeinfo = JBasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO
+ elif self._element_type == Types.CHAR():
+ self._j_typeinfo = JBasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO
+ elif self._element_type == Types.STRING():
+ self._j_typeinfo = JBasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+ else:
+ raise TypeError("Invalid element type for a primitive array.")
- @staticmethod
- def LONG_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)
+ return self._j_typeinfo
- @staticmethod
- def FLOAT_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO)
+ def __eq__(self, o) -> bool:
+ if isinstance(o, BasicArrayTypeInfo):
+ return self._element_type == o._element_type
+ return False
- @staticmethod
- def DOUBLE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO)
+ def __repr__(self):
+ return "BasicArrayTypeInformation<%s>" % self._element_type
- @staticmethod
- def CHAR_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO)
- @staticmethod
- def STRING_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(
- get_gateway().jvm.org.apache.flink.api.common.typeinfo
- .BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
-
-
-def is_basic_array_type_info(type_info: TypeInformation):
- return type_info in {
- BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO(),
- BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO(),
- BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO(),
- BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO(),
- BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO(),
- BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO(),
- BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO(),
- BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO(),
- BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO()
- }
-
-
-class PickledBytesTypeInfo(WrapperTypeInfo, ABC):
+class PickledBytesTypeInfo(TypeInformation):
"""
A PickledBytesTypeInfo indicates the data is a primitive byte array
generated by pickle
serializer.
"""
- @staticmethod
- def PICKLED_BYTE_ARRAY_TYPE_INFO():
- return
WrapperTypeInfo(get_gateway().jvm.org.apache.flink.streaming.api.typeinfo.python
-
.PickledByteArrayTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO)
+ def get_java_type_info(self) -> JavaObject:
+ if not self._j_typeinfo:
+ self._j_typeinfo =
get_gateway().jvm.org.apache.flink.streaming.api.typeinfo.python\
+ .PickledByteArrayTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO
+ return self._j_typeinfo
+
+ def __eq__(self, o: object) -> bool:
+ return isinstance(o, PickledBytesTypeInfo)
+ def __repr__(self):
+ return "PickledByteArrayTypeInformation"
Review comment:
```suggestion
return "PickledBytesTypeInfo"
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]