JingsongLi commented on a change in pull request #8435:
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r290582852
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
##########
@@ -406,78 +416,76 @@ object FlinkTypeFactory {
case _ => false
}
- def toInternalType(relDataType: RelDataType): InternalType =
relDataType.getSqlTypeName match {
- case BOOLEAN => InternalTypes.BOOLEAN
- case TINYINT => InternalTypes.BYTE
- case SMALLINT => InternalTypes.SHORT
- case INTEGER => InternalTypes.INT
- case BIGINT => InternalTypes.LONG
- case FLOAT => InternalTypes.FLOAT
- case DOUBLE => InternalTypes.DOUBLE
- case VARCHAR | CHAR => InternalTypes.STRING
- case VARBINARY | BINARY => InternalTypes.BINARY
- case DECIMAL => InternalTypes.createDecimalType(relDataType.getPrecision,
relDataType.getScale)
-
- // time indicators
- case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
- val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
- if (indicator.isEventTime) {
- InternalTypes.ROWTIME_INDICATOR
- } else {
- InternalTypes.PROCTIME_INDICATOR
- }
-
- // temporal types
- case DATE => InternalTypes.DATE
- case TIME => InternalTypes.TIME
- case TIMESTAMP => InternalTypes.TIMESTAMP
- case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
InternalTypes.INTERVAL_MONTHS
- case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
InternalTypes.INTERVAL_MILLIS
-
- case NULL =>
- throw new TableException(
- "Type NULL is not supported. Null values must have a supported type.")
-
- // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
- // are represented as Enum
- case SYMBOL => InternalTypes.createGenericType(classOf[Enum[_]])
-
- // extract encapsulated Type
- case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
- val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
- genericRelDataType.genericType
-
- case ROW if relDataType.isInstanceOf[RowRelDataType] =>
- val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
- compositeRelDataType.rowType
-
- case ROW if relDataType.isInstanceOf[RelRecordType] =>
- val relRecordType = relDataType.asInstanceOf[RelRecordType]
- new RowSchema(relRecordType).internalType
-
- case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
- val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
- multisetRelDataType.multisetType
-
- case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
- val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
- arrayRelDataType.arrayType
-
- case MAP if relDataType.isInstanceOf[MapRelDataType] =>
- val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
- mapRelDataType.mapType
+ def toLogicalType(relDataType: RelDataType): LogicalType = {
+ val logicalType = relDataType.getSqlTypeName match {
+ case BOOLEAN => new BooleanType()
+ case TINYINT => new TinyIntType()
+ case SMALLINT => new SmallIntType()
+ case INTEGER => new IntType()
+ case BIGINT => new BigIntType()
+ case FLOAT => new FloatType()
+ case DOUBLE => new DoubleType()
+ case VARCHAR | CHAR => new VarCharType(VarCharType.MAX_LENGTH)
+ case VARBINARY | BINARY => new VarBinaryType(VarBinaryType.MAX_LENGTH)
+ case DECIMAL => new DecimalType(relDataType.getPrecision,
relDataType.getScale)
+
+ // time indicators
+ case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
+ val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
+ if (indicator.isEventTime) {
+ new TimestampType(true, TimestampKind.ROWTIME, 3)
+ } else {
+ new TimestampType(true, TimestampKind.PROCTIME, 3)
+ }
- case _@t =>
- throw new TableException(s"Type is not supported: $t")
+ // temporal types
+ case DATE => new DateType()
+ case TIME => new TimeType()
+ case TIMESTAMP => new TimestampType(3)
+ case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
+ DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType
+ case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
+ DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType
+
+ case NULL =>
+ throw new TableException(
+ "Type NULL is not supported. Null values must have a supported
type.")
+
+ // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
+ // are represented as Enum
+ case SYMBOL =>
toPlannerLogicalType(PlannerTypeConversions.toDataType(classOf[Enum[_]]))
+
+ // extract encapsulated Type
+ case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
+ val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
+ genericRelDataType.genericType
+
+ case ROW if relDataType.isInstanceOf[RowRelDataType] =>
+ val compositeRelDataType = relDataType.asInstanceOf[RowRelDataType]
+ compositeRelDataType.rowType
+
+ case ROW if relDataType.isInstanceOf[RelRecordType] =>
+ toInternalRowType(relDataType.asInstanceOf[RelRecordType])
+
+ case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
+ val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
+ multisetRelDataType.multisetType
+
+ case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
+ val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
+ arrayRelDataType.arrayType
+
+ case MAP if relDataType.isInstanceOf[MapRelDataType] =>
+ val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
+ mapRelDataType.mapType
+
+ case _@t =>
+ throw new TableException(s"Type is not supported: $t")
+ }
+ logicalType.copy(relDataType.isNullable)
}
def toInternalRowType(logicalRowType: RelDataType): RowType = {
- // convert to InternalType
- val logicalFieldTypes = logicalRowType.getFieldList.asScala map {
- relDataType => FlinkTypeFactory.toInternalType(relDataType.getType)
- }
- // field names
- val logicalFieldNames = logicalRowType.getFieldNames.asScala
- new RowType(logicalFieldTypes.toArray[InternalType],
logicalFieldNames.toArray)
+ new RowSchema(logicalRowType).internalType
Review comment:
I'll delete it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services