snuyanzin commented on code in PR #28284: URL: https://github.com/apache/flink/pull/28284#discussion_r3334942475
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeFactory.java: ########## @@ -0,0 +1,880 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NothingTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.calcite.ExtendedRelTypeFactory; +import org.apache.flink.table.legacy.api.TableSchema; +import org.apache.flink.table.legacy.types.logical.TypeInformationRawType; +import org.apache.flink.table.planner.plan.schema.BitmapRelDataType; +import org.apache.flink.table.planner.plan.schema.GenericRelDataType; +import org.apache.flink.table.planner.plan.schema.RawRelDataType; +import org.apache.flink.table.planner.plan.schema.StructuredRelDataType; +import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; +import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter; +import org.apache.flink.table.runtime.types.PlannerTypeUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BitmapType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DescriptorType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LegacyTypeInformationType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.SymbolType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.VariantType; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.avatica.util.TimeUnit; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rel.type.StructKind; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.BasicSqlType; +import org.apache.calcite.sql.type.MapSqlType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ConversionUtil; + +import java.lang.reflect.Type; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; +import static org.apache.calcite.sql.type.SqlTypeName.TIMESTAMP; +import static org.apache.calcite.sql.type.SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE; +import static org.apache.calcite.sql.type.SqlTypeName.VARBINARY; + +/** + * Flink specific type factory that represents the interface between Flink's {@link LogicalType} and + * Calcite's {@link RelDataType}. + */ +public class FlinkTypeFactory extends JavaTypeFactoryImpl implements ExtendedRelTypeFactory { + private final Map<LogicalType, RelDataType> seenTypes = new HashMap<>(); + private final ClassLoader classLoader; + + public FlinkTypeFactory(ClassLoader classLoader, RelDataTypeSystem typeSystem) { + super(typeSystem); + this.classLoader = classLoader; + } + + public FlinkTypeFactory(ClassLoader classLoader) { + this(classLoader, FlinkTypeSystem.INSTANCE); + } + + @Override + public RelDataType createRawType(String className, String serializerString) { + final RawType rawType = RawType.restore(classLoader, className, serializerString); + final RelDataType rawRelDataType = createFieldTypeFromLogicalType(rawType); + return canonize(rawRelDataType); + } + + @Override + public RelDataType createStructuredType( + String className, List<RelDataType> fieldTypes, List<String> fieldNames) { + final Optional<Class<?>> resolvedClass = + StructuredType.resolveClass(classLoader, className); + final StructuredType.Builder builder = + resolvedClass + .map(StructuredType::newBuilder) + .orElseGet(() -> StructuredType.newBuilder(className)); + + final List<RelDataTypeField> relFields = new ArrayList<>(); + for (int i = 0; i < fieldTypes.size(); i++) { + relFields.add(new RelDataTypeFieldImpl(fieldNames.get(i), i, fieldTypes.get(i))); + } + + builder.attributes( + relFields.stream() + .map( + f -> + new StructuredType.StructuredAttribute( + f.getName(), toLogicalType(f.getType()))) + .collect(Collectors.toList())); + + final RelDataType relDataType = new StructuredRelDataType(builder.build(), relFields); + return canonize(relDataType); + } + + @Override + public RelDataType createBitmapType() { + return canonize(new BitmapRelDataType(new BitmapType())); + } + + @Override + public RelDataType createArrayType(RelDataType elementType, long maxCardinality) { + // Just validate type, make sure there is a failure in validate phase. + checkForNullType(elementType); + toLogicalType(elementType); + return super.createArrayType(elementType, maxCardinality); + } + + @Override + public RelDataType createMapType(RelDataType keyType, RelDataType valueType) { + // Just validate type, make sure there is a failure in validate phase. + checkForNullType(keyType, valueType); + toLogicalType(keyType); + toLogicalType(valueType); + return super.createMapType(keyType, valueType); + } + + @Override + public RelDataType createMultisetType(RelDataType elementType, long maxCardinality) { + // Just validate type, make sure there is a failure in validate phase. + checkForNullType(elementType); + toLogicalType(elementType); + return super.createMultisetType(elementType, maxCardinality); + } + + @Override + public Type getJavaClass(RelDataType type) { + if (type.getSqlTypeName() == SqlTypeName.FLOAT) { + return type.isNullable() ? Float.class : Float.TYPE; + } + return super.getJavaClass(type); + } + + @Override + public RelDataType createSqlType(SqlTypeName typeName) { + if (typeName == SqlTypeName.DECIMAL) { + // if we got here, the precision and scale are not specified, here we + // keep precision/scale in sync with our type system's default value, + // see DecimalType.USER_DEFAULT. + return createSqlType( + typeName, DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE); + } + return super.createSqlType(typeName); + } + + @Override + public RelDataType createSqlType(SqlTypeName typeName, int precision) { + // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue + // Calcite will limit the length of the VARCHAR type to 65536. + if (typeName == SqlTypeName.VARCHAR && precision < 0) { + return createSqlType(typeName, getTypeSystem().getDefaultPrecision(typeName)); + } + return super.createSqlType(typeName, precision); + } + + @Override + public RelDataType createTypeWithNullability(RelDataType relDataType, boolean isNullable) { + // nullability change not necessary + if (relDataType.isNullable() == isNullable) { + return canonize(relDataType); + } + + // change nullability + final RelDataType newType; + if (relDataType instanceof RawRelDataType) { + newType = ((RawRelDataType) relDataType).createWithNullability(isNullable); + } else if (relDataType instanceof StructuredRelDataType) { + newType = ((StructuredRelDataType) relDataType).createWithNullability(isNullable); + } else if (relDataType instanceof BitmapRelDataType) { + newType = ((BitmapRelDataType) relDataType).createWithNullability(isNullable); + } else if (relDataType instanceof GenericRelDataType) { + final GenericRelDataType generic = (GenericRelDataType) relDataType; + newType = new GenericRelDataType(generic.genericType(), isNullable, getTypeSystem()); + } else if (relDataType instanceof TimeIndicatorRelDataType) { + final TimeIndicatorRelDataType it = (TimeIndicatorRelDataType) relDataType; + newType = + new TimeIndicatorRelDataType( + it.typeSystemField(), it.originalType(), isNullable, it.isEventTime()); + } else if (relDataType instanceof RelRecordType + && ((RelRecordType) relDataType).getStructKind() + == StructKind.PEEK_FIELDS_NO_EXPAND) { + // for nested rows we keep the nullability property, + // top-level rows fall back to Calcite's default handling + final RelRecordType rt = (RelRecordType) relDataType; + newType = new RelRecordType(rt.getStructKind(), rt.getFieldList(), isNullable); + } else { + newType = super.createTypeWithNullability(relDataType, isNullable); + } + + return canonize(newType); + } + + @Override + public RelDataType leastRestrictive(List<RelDataType> types) { + final Optional<RelDataType> resolved = resolveAllIdenticalTypes(types); + final RelDataType leastRestrictive = + resolved.isPresent() ? resolved.get() : super.leastRestrictive(types); + // NULL is reserved for untyped literals only + if (leastRestrictive == null || leastRestrictive.getSqlTypeName() == SqlTypeName.NULL) { + return null; + } + return leastRestrictive; + } + + private Optional<RelDataType> resolveAllIdenticalTypes(List<RelDataType> types) { + final RelDataType head = types.get(0); + // check if all types are the same + if (types.stream().allMatch(t -> t.equals(head))) { + // types are the same, check nullability + final boolean nullable = + types.stream() + .anyMatch( + sqlType -> + sqlType.isNullable() + || sqlType.getSqlTypeName() + == SqlTypeName.NULL); + // return type with nullability + return Optional.of(createTypeWithNullability(head, nullable)); + } else { + // types are not all the same + if (types.stream().anyMatch(t -> t.getSqlTypeName() == SqlTypeName.ANY)) { + // one of the type was RAW. + // we cannot generate a common type if it differs from other types. + throw new TableException("Generic RAW types must have a common type information."); + } else { + // cannot resolve a common type for different input types + return Optional.empty(); + } + } + } + + @Override + public Charset getDefaultCharset() { + return Charset.forName(ConversionUtil.NATIVE_UTF16_CHARSET_NAME); + } + + /** + * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory. + * + * @param fieldNames field names + * @param fieldTypes field types, every element is Flink's {@link LogicalType}. + * @param structKind Name resolution policy. See more information in {@link StructKind}. + * @return a struct type with the input fieldNames, input fieldTypes. + */ + private RelDataType buildStructType( + String[] fieldNames, LogicalType[] fieldTypes, StructKind structKind) { + FieldInfoBuilder b = builder(); + b.kind(structKind); + // mirror Scala's `fieldNames.zip(fieldTypes)`, which truncates to the shorter sequence Review Comment: sure, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
