This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 50ff839d0d6 [FLINK-39812][table] Migrate `FlinkTypeFactory` to java
50ff839d0d6 is described below
commit 50ff839d0d681160797468bbcf2bf0c917dd882a
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Jun 1 23:06:25 2026 +0200
[FLINK-39812][table] Migrate `FlinkTypeFactory` to java
---
.../table/planner/calcite/FlinkTypeFactory.java | 877 +++++++++++++++++++++
.../BatchPhysicalLocalRuntimeFilterBuilder.java | 5 +-
.../table/planner/calcite/FlinkTypeFactory.scala | 723 -----------------
.../planner/codegen/AsyncCodeGeneratorTest.java | 17 +-
.../codegen/AsyncCorrelateCodeGeneratorTest.java | 9 +-
.../table/planner/plan/utils/AsyncUtilTest.java | 11 +-
.../metadata/AggCallSelectivityEstimatorTest.scala | 2 +-
7 files changed, 889 insertions(+), 755 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeFactory.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeFactory.java
new file mode 100644
index 00000000000..fe18a71410e
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeFactory.java
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.calcite.ExtendedRelTypeFactory;
+import org.apache.flink.table.legacy.api.TableSchema;
+import org.apache.flink.table.legacy.types.logical.TypeInformationRawType;
+import org.apache.flink.table.planner.plan.schema.BitmapRelDataType;
+import org.apache.flink.table.planner.plan.schema.GenericRelDataType;
+import org.apache.flink.table.planner.plan.schema.RawRelDataType;
+import org.apache.flink.table.planner.plan.schema.StructuredRelDataType;
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.runtime.types.PlannerTypeUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BitmapType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DescriptorType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VariantType;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.MapSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ConversionUtil;
+
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
+import static org.apache.calcite.sql.type.SqlTypeName.TIMESTAMP;
+import static
org.apache.calcite.sql.type.SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.calcite.sql.type.SqlTypeName.VARBINARY;
+
+/**
+ * Flink specific type factory that represents the interface between Flink's
{@link LogicalType} and
+ * Calcite's {@link RelDataType}.
+ */
+public class FlinkTypeFactory extends JavaTypeFactoryImpl implements
ExtendedRelTypeFactory {
+ private final Map<LogicalType, RelDataType> seenTypes = new HashMap<>();
+ private final ClassLoader classLoader;
+
+ public FlinkTypeFactory(ClassLoader classLoader, RelDataTypeSystem
typeSystem) {
+ super(typeSystem);
+ this.classLoader = classLoader;
+ }
+
+ public FlinkTypeFactory(ClassLoader classLoader) {
+ this(classLoader, FlinkTypeSystem.INSTANCE);
+ }
+
+ @Override
+ public RelDataType createRawType(String className, String
serializerString) {
+ final RawType rawType = RawType.restore(classLoader, className,
serializerString);
+ final RelDataType rawRelDataType =
createFieldTypeFromLogicalType(rawType);
+ return canonize(rawRelDataType);
+ }
+
+ @Override
+ public RelDataType createStructuredType(
+ String className, List<RelDataType> fieldTypes, List<String>
fieldNames) {
+ final Optional<Class<?>> resolvedClass =
+ StructuredType.resolveClass(classLoader, className);
+ final StructuredType.Builder builder =
+ resolvedClass
+ .map(StructuredType::newBuilder)
+ .orElseGet(() -> StructuredType.newBuilder(className));
+
+ final List<RelDataTypeField> relFields = new ArrayList<>();
+ for (int i = 0; i < fieldTypes.size(); i++) {
+ relFields.add(new RelDataTypeFieldImpl(fieldNames.get(i), i,
fieldTypes.get(i)));
+ }
+
+ builder.attributes(
+ relFields.stream()
+ .map(
+ f ->
+ new StructuredType.StructuredAttribute(
+ f.getName(),
toLogicalType(f.getType())))
+ .collect(Collectors.toList()));
+
+ final RelDataType relDataType = new
StructuredRelDataType(builder.build(), relFields);
+ return canonize(relDataType);
+ }
+
+ @Override
+ public RelDataType createBitmapType() {
+ return canonize(new BitmapRelDataType(new BitmapType()));
+ }
+
+ @Override
+ public RelDataType createArrayType(RelDataType elementType, long
maxCardinality) {
+ // Just validate type, make sure there is a failure in validate phase.
+ checkForNullType(elementType);
+ toLogicalType(elementType);
+ return super.createArrayType(elementType, maxCardinality);
+ }
+
+ @Override
+ public RelDataType createMapType(RelDataType keyType, RelDataType
valueType) {
+ // Just validate type, make sure there is a failure in validate phase.
+ checkForNullType(keyType, valueType);
+ toLogicalType(keyType);
+ toLogicalType(valueType);
+ return super.createMapType(keyType, valueType);
+ }
+
+ @Override
+ public RelDataType createMultisetType(RelDataType elementType, long
maxCardinality) {
+ // Just validate type, make sure there is a failure in validate phase.
+ checkForNullType(elementType);
+ toLogicalType(elementType);
+ return super.createMultisetType(elementType, maxCardinality);
+ }
+
+ @Override
+ public Type getJavaClass(RelDataType type) {
+ if (type.getSqlTypeName() == SqlTypeName.FLOAT) {
+ return type.isNullable() ? Float.class : Float.TYPE;
+ }
+ return super.getJavaClass(type);
+ }
+
+ @Override
+ public RelDataType createSqlType(SqlTypeName typeName) {
+ if (typeName == SqlTypeName.DECIMAL) {
+ // if we got here, the precision and scale are not specified, here
we
+ // keep precision/scale in sync with our type system's default
value,
+ // see DecimalType.USER_DEFAULT.
+ return createSqlType(
+ typeName, DecimalType.DEFAULT_PRECISION,
DecimalType.DEFAULT_SCALE);
+ }
+ return super.createSqlType(typeName);
+ }
+
+ @Override
+ public RelDataType createSqlType(SqlTypeName typeName, int precision) {
+ // it might happen that inferred VARCHAR types overflow as we set them
to Int.MaxValue
+ // Calcite will limit the length of the VARCHAR type to 65536.
+ if (typeName == SqlTypeName.VARCHAR && precision < 0) {
+ return createSqlType(typeName,
getTypeSystem().getDefaultPrecision(typeName));
+ }
+ return super.createSqlType(typeName, precision);
+ }
+
+ @Override
+ public RelDataType createTypeWithNullability(RelDataType relDataType,
boolean isNullable) {
+ // nullability change not necessary
+ if (relDataType.isNullable() == isNullable) {
+ return canonize(relDataType);
+ }
+
+ // change nullability
+ final RelDataType newType;
+ if (relDataType instanceof RawRelDataType) {
+ newType = ((RawRelDataType)
relDataType).createWithNullability(isNullable);
+ } else if (relDataType instanceof StructuredRelDataType) {
+ newType = ((StructuredRelDataType)
relDataType).createWithNullability(isNullable);
+ } else if (relDataType instanceof BitmapRelDataType) {
+ newType = ((BitmapRelDataType)
relDataType).createWithNullability(isNullable);
+ } else if (relDataType instanceof GenericRelDataType) {
+ final GenericRelDataType generic = (GenericRelDataType)
relDataType;
+ newType = new GenericRelDataType(generic.genericType(),
isNullable, getTypeSystem());
+ } else if (relDataType instanceof TimeIndicatorRelDataType) {
+ final TimeIndicatorRelDataType it = (TimeIndicatorRelDataType)
relDataType;
+ newType =
+ new TimeIndicatorRelDataType(
+ it.typeSystemField(), it.originalType(),
isNullable, it.isEventTime());
+ } else if (relDataType instanceof RelRecordType
+ && ((RelRecordType) relDataType).getStructKind()
+ == StructKind.PEEK_FIELDS_NO_EXPAND) {
+ // for nested rows we keep the nullability property,
+ // top-level rows fall back to Calcite's default handling
+ final RelRecordType rt = (RelRecordType) relDataType;
+ newType = new RelRecordType(rt.getStructKind(), rt.getFieldList(),
isNullable);
+ } else {
+ newType = super.createTypeWithNullability(relDataType, isNullable);
+ }
+
+ return canonize(newType);
+ }
+
+ @Override
+ public RelDataType leastRestrictive(List<RelDataType> types) {
+ final Optional<RelDataType> resolved = resolveAllIdenticalTypes(types);
+ final RelDataType leastRestrictive =
+ resolved.orElseGet(() -> super.leastRestrictive(types));
+ // NULL is reserved for untyped literals only
+ if (leastRestrictive == null || leastRestrictive.getSqlTypeName() ==
SqlTypeName.NULL) {
+ return null;
+ }
+ return leastRestrictive;
+ }
+
+ private Optional<RelDataType> resolveAllIdenticalTypes(List<RelDataType>
types) {
+ final RelDataType head = types.get(0);
+ // check if all types are the same
+ if (types.stream().allMatch(t -> t.equals(head))) {
+ // types are the same, check nullability
+ final boolean nullable =
+ types.stream()
+ .anyMatch(
+ sqlType ->
+ sqlType.isNullable()
+ || sqlType.getSqlTypeName()
+ ==
SqlTypeName.NULL);
+ // return type with nullability
+ return Optional.of(createTypeWithNullability(head, nullable));
+ } else {
+ // types are not all the same
+ if (types.stream().anyMatch(t -> t.getSqlTypeName() ==
SqlTypeName.ANY)) {
+ // one of the type was RAW.
+ // we cannot generate a common type if it differs from other
types.
+ throw new TableException("Generic RAW types must have a common
type information.");
+ } else {
+ // cannot resolve a common type for different input types
+ return Optional.empty();
+ }
+ }
+ }
+
+ @Override
+ public Charset getDefaultCharset() {
+ return Charset.forName(ConversionUtil.NATIVE_UTF16_CHARSET_NAME);
+ }
+
+ /**
+ * Creates a struct type with the input fieldNames and input fieldTypes
using FlinkTypeFactory.
+ *
+ * @param fieldNames field names
+ * @param fieldTypes field types, every element is Flink's {@link
LogicalType}.
+ * @param structKind Name resolution policy. See more information in
{@link StructKind}.
+ * @return a struct type with the input fieldNames, input fieldTypes.
+ */
+ private RelDataType buildStructType(
+ String[] fieldNames, LogicalType[] fieldTypes, StructKind
structKind) {
+ RelDataTypeFactory.Builder b = builder();
+ b.kind(structKind);
+ for (int i = 0; i < fieldTypes.length; i++) {
+ final LogicalType fieldType = fieldTypes[i];
+ final String fieldName = fieldNames[i];
+ final RelDataType fieldRelDataType =
createFieldTypeFromLogicalType(fieldType);
+ checkForNullType(fieldRelDataType);
+ b.add(fieldName, fieldRelDataType);
+ }
+ return b.build();
+ }
+
+ /**
+ * Create a calcite field type in table schema from {@link LogicalType}.
It uses
+ * PEEK_FIELDS_NO_EXPAND when type is a nested struct type (Flink {@link
RowType}).
+ *
+ * @param type Flink logical type.
+ * @return calcite {@link RelDataType}.
+ */
+ public RelDataType createFieldTypeFromLogicalType(LogicalType type) {
+
+ // Kind in TimestampType do not affect the hashcode and equals, So we
can't put it to
+ // seenTypes
+ final RelDataType relType;
+ switch (type.getTypeRoot()) {
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final TimestampType timestampType = (TimestampType) type;
+ switch (timestampType.getKind()) {
+ case ROWTIME:
+ relType =
createRowtimeIndicatorType(type.isNullable(), false);
+ break;
+ case REGULAR:
+ relType = createSqlType(TIMESTAMP,
timestampType.getPrecision());
+ break;
+ case PROCTIME:
+ throw new TableException(
+ "Processing time indicator only supports"
+ + " LocalZonedTimestampType, but
actual is TimestampType."
+ + " This is a bug in planner, please
file an issue.");
+ default:
+ throw new TableException(
+ "Unsupported TimestampKind: " +
timestampType.getKind());
+ }
+ break;
+
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final LocalZonedTimestampType lzTs = (LocalZonedTimestampType)
type;
+ switch (lzTs.getKind()) {
+ case PROCTIME:
+ relType =
createProctimeIndicatorType(type.isNullable());
+ break;
+ case ROWTIME:
+ relType =
createRowtimeIndicatorType(type.isNullable(), true);
+ break;
+ case REGULAR:
+ relType =
+ createSqlType(TIMESTAMP_WITH_LOCAL_TIME_ZONE,
lzTs.getPrecision());
+ break;
+ default:
+ throw new TableException("Unsupported TimestampKind: "
+ lzTs.getKind());
+ }
+ break;
+ default:
+ final RelDataType seenType = seenTypes.get(type);
+ if (seenType != null) {
+ relType = seenType;
+ } else {
+ final RelDataType refType = newRelDataType(type);
+ seenTypes.put(type, refType);
+ relType = refType;
+ }
+ }
+
+ return createTypeWithNullability(relType, type.isNullable());
+ }
+
+ private RelDataType newRelDataType(LogicalType logicalType) {
+ switch (logicalType.getTypeRoot()) {
+ case NULL:
+ return createSqlType(SqlTypeName.NULL);
+ case BOOLEAN:
+ return createSqlType(SqlTypeName.BOOLEAN);
+ case TINYINT:
+ return createSqlType(SqlTypeName.TINYINT);
+ case SMALLINT:
+ return createSqlType(SqlTypeName.SMALLINT);
+ case INTEGER:
+ return createSqlType(SqlTypeName.INTEGER);
+ case BIGINT:
+ return createSqlType(SqlTypeName.BIGINT);
+ case FLOAT:
+ return createSqlType(SqlTypeName.FLOAT);
+ case DOUBLE:
+ return createSqlType(SqlTypeName.DOUBLE);
+ case VARCHAR:
+ return createSqlType(SqlTypeName.VARCHAR, ((VarCharType)
logicalType).getLength());
+ case CHAR:
+ return createSqlType(SqlTypeName.CHAR, ((CharType)
logicalType).getLength());
+
+ // temporal types
+ case DATE:
+ return createSqlType(SqlTypeName.DATE);
+ case TIME_WITHOUT_TIME_ZONE:
+ return createSqlType(SqlTypeName.TIME, ((TimeType)
logicalType).getPrecision());
+
+ // interval types
+ case INTERVAL_YEAR_MONTH:
+ return createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.YEAR,
TimeUnit.MONTH, SqlParserPos.ZERO));
+ case INTERVAL_DAY_TIME:
+ return createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.DAY,
TimeUnit.SECOND, SqlParserPos.ZERO));
+
+ case BINARY:
+ return createSqlType(SqlTypeName.BINARY, ((BinaryType)
logicalType).getLength());
+ case VARBINARY:
+ return createSqlType(VARBINARY, ((VarBinaryType)
logicalType).getLength());
+
+ case DECIMAL:
+ if (logicalType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) logicalType;
+ return createSqlType(
+ DECIMAL, decimalType.getPrecision(),
decimalType.getScale());
+ } else if (logicalType instanceof LegacyTypeInformationType) {
+ LegacyTypeInformationType<?> legacyType =
+ (LegacyTypeInformationType<?>) logicalType;
+ if (legacyType.getTypeInformation() ==
BasicTypeInfo.BIG_DEC_TYPE_INFO) {
+ return createSqlType(DECIMAL, 38, 18);
+ }
+ }
+ throw new TableException("Type is not supported: " +
logicalType);
+ case ROW:
+ final RowType rowType = (RowType) logicalType;
+ return buildStructType(
+ rowType.getFieldNames().toArray(new String[0]),
+ rowType.getChildren().toArray(new LogicalType[0]),
+ // fields are not expanded in "SELECT *"
+ StructKind.PEEK_FIELDS_NO_EXPAND);
+
+ case STRUCTURED_TYPE:
+ if (logicalType instanceof StructuredType) {
+ return StructuredRelDataType.create(this, (StructuredType)
logicalType);
+ } else if (logicalType instanceof LegacyTypeInformationType) {
+ return createFieldTypeFromLogicalType(
+ PlannerTypeUtils.removeLegacyTypes(logicalType));
+ }
+ throw new TableException("Type is not supported: " +
logicalType);
+ case ARRAY:
+ final ArrayType arrayType = (ArrayType) logicalType;
+ return createArrayType(
+
createFieldTypeFromLogicalType(arrayType.getElementType()), -1);
+
+ case MAP:
+ final MapType mapType = (MapType) logicalType;
+ return createMapType(
+ createFieldTypeFromLogicalType(mapType.getKeyType()),
+
createFieldTypeFromLogicalType(mapType.getValueType()));
+
+ case MULTISET:
+ final MultisetType multisetType = (MultisetType) logicalType;
+ return createMultisetType(
+
createFieldTypeFromLogicalType(multisetType.getElementType()), -1);
+
+ case RAW:
+ if (logicalType instanceof RawType) {
+ return new RawRelDataType((RawType<?>) logicalType);
+ } else if (logicalType instanceof TypeInformationRawType) {
+ return new GenericRelDataType(
+ (TypeInformationRawType<?>) logicalType, true,
getTypeSystem());
+ } else if (logicalType instanceof LegacyTypeInformationType) {
+ return createFieldTypeFromLogicalType(
+ PlannerTypeUtils.removeLegacyTypes(logicalType));
+ }
+ throw new TableException("Type is not supported: " +
logicalType);
+
+ case SYMBOL:
+ return createSqlType(SqlTypeName.SYMBOL);
+
+ case DESCRIPTOR:
+ return createSqlType(SqlTypeName.COLUMN_LIST);
+
+ case VARIANT:
+ return createSqlType(SqlTypeName.VARIANT);
+
+ case BITMAP:
+ return new BitmapRelDataType((BitmapType) logicalType);
+
+ default:
+ throw new TableException("Type is not supported: " +
logicalType);
+ }
+ }
+
+ /**
+ * This is a safety check in case the null type ends up in the type
factory for other use cases
+ * than untyped NULL literals.
+ */
+ private void checkForNullType(RelDataType... childTypes) {
+ for (RelDataType childType : childTypes) {
+
+ if (childType.getSqlTypeName() == SqlTypeName.NULL) {
+ throw new ValidationException(
+ "The null type is reserved for representing untyped
NULL literals. It should not be "
+ + "used in constructed types. Please cast NULL
literals to a more explicit type.");
+ }
+ }
+ }
+
+ /**
+ * Creates a indicator type for processing-time, but with similar
properties as SQL timestamp.
+ */
+ public RelDataType createProctimeIndicatorType(boolean isNullable) {
+ final RelDataType originalType =
+ createFieldTypeFromLogicalType(new
LocalZonedTimestampType(isNullable, 3));
+ return canonize(
+ new TimeIndicatorRelDataType(
+ getTypeSystem(), (BasicSqlType) originalType,
isNullable, false));
+ }
+
+ /** Creates a indicator type for event-time, but with similar properties
as SQL timestamp. */
+ public RelDataType createRowtimeIndicatorType(boolean isNullable, boolean
isTimestampLtz) {
+ final RelDataType originalType =
+ (isTimestampLtz)
+ ? createFieldTypeFromLogicalType(new
LocalZonedTimestampType(isNullable, 3))
+ : createFieldTypeFromLogicalType(new
TimestampType(isNullable, 3));
+
+ return canonize(
+ new TimeIndicatorRelDataType(
+ getTypeSystem(), (BasicSqlType) originalType,
isNullable, true));
+ }
+
+ public static RowType toLogicalRowType(RelDataType relType) {
+ Preconditions.checkArgument(relType.isStruct());
+ return RowType.of(
+ relType.getFieldList().stream()
+ .map(fieldType -> toLogicalType(fieldType.getType()))
+ .collect(Collectors.toList())
+ .toArray(new LogicalType[0]),
+ relType.getFieldNames().toArray(new String[0]));
+ }
+
+ public static boolean isTimeIndicatorType(LogicalType logicalType) {
+ return logicalType instanceof TimestampType
+ && ((TimestampType) logicalType).getKind() ==
TimestampKind.ROWTIME
+ || logicalType instanceof LocalZonedTimestampType
+ && ((LocalZonedTimestampType) logicalType).getKind()
+ == TimestampKind.PROCTIME;
+ }
+
+ public static boolean isTimeIndicatorType(RelDataType relDataType) {
+ return relDataType instanceof TimeIndicatorRelDataType;
+ }
+
+ public static boolean isRowtimeIndicatorType(RelDataType relDataType) {
+ return relDataType instanceof TimeIndicatorRelDataType
+ && ((TimeIndicatorRelDataType) relDataType).isEventTime();
+ }
+
+ public static boolean isProctimeIndicatorType(RelDataType relDataType) {
+ return relDataType instanceof TimeIndicatorRelDataType
+ && !((TimeIndicatorRelDataType) relDataType).isEventTime();
+ }
+
+ public static boolean isTimestampLtzIndicatorType(RelDataType relDataType)
{
+ return relDataType instanceof TimeIndicatorRelDataType
+ && ((TimeIndicatorRelDataType) relDataType)
+ .originalType()
+ .getSqlTypeName()
+ .equals(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ }
+
+ @Deprecated
+ public static boolean isProctimeIndicatorType(TypeInformation<?> typeInfo)
{
+ return typeInfo instanceof TimeIndicatorTypeInfo
+ && !((TimeIndicatorTypeInfo) typeInfo).isEventTime();
+ }
+
+ @Deprecated
+ public static boolean isRowtimeIndicatorType(TypeInformation<?> typeInfo) {
+ return typeInfo instanceof TimeIndicatorTypeInfo
+ && ((TimeIndicatorTypeInfo) typeInfo).isEventTime();
+ }
+
+ @Deprecated
+ public static boolean isTimeIndicatorType(TypeInformation<?> typeInfo) {
+ return typeInfo instanceof TimeIndicatorTypeInfo;
+ }
+
+ /** Returns a projected {@link RelDataType} of the structure type. */
+ public RelDataType projectStructType(RelDataType relType, int[]
selectedFields) {
+ final List<RelDataTypeField> fields = new ArrayList<>();
+ for (int selectedField : selectedFields) {
+ fields.add(relType.getFieldList().get(selectedField));
+ }
+ return this.createStructType(fields);
+ }
+
+ /**
+ * Creates a struct type with the input fieldNames and input fieldTypes
using FlinkTypeFactory.
+ *
+ * @param tableSchema schema to convert to Calcite's specific one
+ * @return a struct type with the input fieldNames, input fieldTypes, and
system fields
+ */
+ public RelDataType buildRelNodeRowType(TableSchema tableSchema) {
+ return buildRelNodeRowType(
+ tableSchema.getFieldNames(),
+ Arrays.stream(tableSchema.getFieldDataTypes())
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new));
+ }
+
+ /**
+ * Creates a table row type with the input fieldNames and input fieldTypes
using
+ * FlinkTypeFactory. Table row type is table schema for Calcite RelNode.
See getRowType of
+ * {@link RelNode}. Use FULLY_QUALIFIED to let each field must be
referenced explicitly.
+ */
+ public RelDataType buildRelNodeRowType(RowType rowType) {
+ final List<RowType.RowField> fields = rowType.getFields();
+ return buildStructType(
+
fields.stream().map(RowType.RowField::getName).toArray(String[]::new),
+
fields.stream().map(RowType.RowField::getType).toArray(LogicalType[]::new),
+ StructKind.FULLY_QUALIFIED);
+ }
+
+ /**
+ * Creates a table row type with the input fieldNames and input fieldTypes
using
+ * FlinkTypeFactory. Table row type is table schema for Calcite RelNode.
See getRowType of
+ * {@link RelNode}. Use FULLY_QUALIFIED to let each field must be
referenced explicitly.
+ *
+ * @param fieldNames field names
+ * @param fieldTypes field types, every element is Flink's {@link
LogicalType}
+ * @return a table row type with the input fieldNames, input fieldTypes.
+ */
+ public RelDataType buildRelNodeRowType(String[] fieldNames, LogicalType[]
fieldTypes) {
+ return buildStructType(fieldNames, fieldTypes,
StructKind.FULLY_QUALIFIED);
+ }
+
+ /**
+ * Creates a table row type with the given field names and field types.
Table row type is table
+ * schema for Calcite {@link RelNode}. See {@link RelNode#getRowType()}.
+ *
+ * <p>It uses {@link StructKind#FULLY_QUALIFIED} to let each field must be
referenced
+ * explicitly.
+ *
+ * @param fieldNames field names
+ * @param fieldTypes field types, every element is Flink's {@link
LogicalType}
+ * @return a table row type with the input fieldNames, input fieldTypes.
+ */
+ public RelDataType buildRelNodeRowType(List<String> fieldNames,
List<LogicalType> fieldTypes) {
+ return buildStructType(
+ fieldNames.toArray(new String[0]),
+ fieldTypes.toArray(new LogicalType[0]),
+ StructKind.FULLY_QUALIFIED);
+ }
+
+ /**
+ * Creates a table row type with the input fieldNames and input fieldTypes
using
+ * FlinkTypeFactory. Table row type is table schema for Calcite RelNode.
See getRowType of
+ * {@link RelNode}. Use FULLY_QUALIFIED to let each field must be
referenced explicitly.
+ *
+ * @param fieldNames field names
+ * @param fieldTypes field types, every element is Flink's {@link
LogicalType}
+ * @return a table row type with the input fieldNames, input fieldTypes.
+ */
+ public RelDataType buildRelNodeRowType(Seq<String> fieldNames,
Seq<LogicalType> fieldTypes) {
+ return buildRelNodeRowType(
+ JavaConverters.seqAsJavaList(fieldNames),
JavaConverters.seqAsJavaList(fieldTypes));
+ }
+
+ public static LogicalType toLogicalType(RelDataType relType) {
+ return
toLogicalTypeWithoutNullability(relType).copy(relType.isNullable());
+ }
+
+ public static TableSchema toTableSchema(RelDataType relDataType) {
+ final String[] fieldNames = relDataType.getFieldNames().toArray(new
String[0]);
+ final DataType[] fieldTypes =
+ relDataType.getFieldList().stream()
+ .map(
+ field ->
+
LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
+
FlinkTypeFactory.toLogicalType(field.getType())))
+ .toArray(DataType[]::new);
+ return TableSchema.builder().fields(fieldNames, fieldTypes).build();
+ }
+
+ /**
+ * Creates a struct type with the physical columns using FlinkTypeFactory.
+ *
+ * @param tableSchema schema to convert to Calcite's specific one
+ * @return a struct type with the input fieldNames, input fieldTypes.
+ */
+ public RelDataType buildPhysicalRelNodeRowType(TableSchema tableSchema) {
+ return
buildRelNodeRowType(TableSchemaUtils.getPhysicalSchema(tableSchema));
+ }
+
+ /**
+ * Creates a struct type with the persisted columns using FlinkTypeFactory.
+ *
+ * @param tableSchema schema to convert to Calcite's specific one
+ * @return a struct type with the input fieldsNames, input fieldTypes.
+ */
+ public RelDataType buildPersistedRelNodeRowType(TableSchema tableSchema) {
+ return
buildRelNodeRowType(TableSchemaUtils.getPersistedSchema(tableSchema));
+ }
+
+ private static LogicalType toLogicalTypeWithoutNullability(RelDataType
relDataType) {
+ switch (relDataType.getSqlTypeName()) {
+ case BOOLEAN:
+ return new BooleanType();
+ case TINYINT:
+ return new TinyIntType();
+ case SMALLINT:
+ return new SmallIntType();
+ case INTEGER:
+ return new IntType();
+ case BIGINT:
+ return new BigIntType();
+ case FLOAT:
+ return new FloatType();
+ case DOUBLE:
+ return new DoubleType();
+ case CHAR:
+ if (relDataType.getPrecision() == 0) {
+ return CharType.ofEmptyLiteral();
+ } else {
+ return new CharType(relDataType.getPrecision());
+ }
+ case VARCHAR:
+ if (relDataType.getPrecision() == 0) {
+ return VarCharType.ofEmptyLiteral();
+ } else {
+ return new VarCharType(relDataType.getPrecision());
+ }
+ case BINARY:
+ if (relDataType.getPrecision() == 0) {
+ return BinaryType.ofEmptyLiteral();
+ } else {
+ return new BinaryType(relDataType.getPrecision());
+ }
+ case VARBINARY:
+ if (relDataType.getPrecision() == 0) {
+ return VarBinaryType.ofEmptyLiteral();
+ } else {
+ return new VarBinaryType(relDataType.getPrecision());
+ }
+ case DECIMAL:
+ return new DecimalType(relDataType.getPrecision(),
relDataType.getScale());
+
+ case TIMESTAMP:
+ if (relDataType instanceof TimeIndicatorRelDataType) {
+ final TimeIndicatorRelDataType indicator =
+ (TimeIndicatorRelDataType) relDataType;
+ if (indicator.isEventTime()) {
+ return new TimestampType(true, TimestampKind.ROWTIME,
3);
+ } else {
+ throw new TableException(
+ "Processing time indicator only supports"
+ + " LocalZonedTimestampType, but
actual is TimestampType."
+ + " This is a bug in planner, please
file an issue.");
+ }
+ } else {
+ return new TimestampType(relDataType.getPrecision());
+ }
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ if (relDataType instanceof TimeIndicatorRelDataType) {
+ TimeIndicatorRelDataType indicator =
(TimeIndicatorRelDataType) relDataType;
+ if (indicator.isEventTime()) {
+ return new LocalZonedTimestampType(true,
TimestampKind.ROWTIME, 3);
+ } else {
+ return new LocalZonedTimestampType(true,
TimestampKind.PROCTIME, 3);
+ }
+ }
+ return new LocalZonedTimestampType(relDataType.getPrecision());
+
+ case DATE:
+ return new DateType();
+ case TIME:
+ return new TimeType(relDataType.getPrecision());
+
+ case INTERVAL_YEAR:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_MONTH:
+ return DataTypes.INTERVAL(DataTypes.MONTH()).getLogicalType();
+
+ case INTERVAL_DAY:
+ case INTERVAL_DAY_HOUR:
+ case INTERVAL_DAY_MINUTE:
+ case INTERVAL_DAY_SECOND:
+ case INTERVAL_HOUR:
+ case INTERVAL_HOUR_MINUTE:
+ case INTERVAL_HOUR_SECOND:
+ case INTERVAL_MINUTE:
+ case INTERVAL_MINUTE_SECOND:
+ case INTERVAL_SECOND:
+ if (relDataType.getPrecision() > 3) {
+ throw new TableException(
+ "DAY_INTERVAL_TYPES precision is not supported: "
+ + relDataType.getPrecision());
+ }
+ return
DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType();
+
+ case NULL:
+ return new NullType();
+
+ case SYMBOL:
+ return new SymbolType();
+
+ case COLUMN_LIST:
+ return new DescriptorType();
+
+ // extract encapsulated Type
+ case ANY:
+ if (relDataType instanceof GenericRelDataType) {
+ final GenericRelDataType genericRelDataType =
(GenericRelDataType) relDataType;
+ return genericRelDataType.genericType();
+ } else {
+ throw new TableException("Type is not supported: " +
relDataType);
+ }
+
+ case ROW:
+ if (relDataType instanceof RelRecordType) {
+ return toLogicalRowType(relDataType);
+ } else {
+ throw new TableException("Type is not supported: " +
relDataType);
+ }
+
+ case STRUCTURED:
+ if (relDataType instanceof StructuredRelDataType) {
+ return ((StructuredRelDataType)
relDataType).getStructuredType();
+ } else {
+ throw new TableException("Type is not supported: " +
relDataType);
+ }
+
+ case MULTISET:
+ return new
MultisetType(toLogicalType(relDataType.getComponentType()));
+
+ case ARRAY:
+ return new
ArrayType(toLogicalType(relDataType.getComponentType()));
+
+ case MAP:
+ if (relDataType instanceof MapSqlType) {
+ final MapSqlType mapRelDataType = (MapSqlType) relDataType;
+ return new MapType(
+ toLogicalType(mapRelDataType.getKeyType()),
+ toLogicalType(mapRelDataType.getValueType()));
+ } else {
+ throw new TableException("Type is not supported: " +
relDataType);
+ }
+
+ // CURSOR for UDTF case, whose type info will never be used, just
a placeholder
+ case CURSOR:
+ return new TypeInformationRawType(new NothingTypeInfo());
+
+ case VARIANT:
+ return new VariantType();
+
+ case OTHER:
+ if (relDataType instanceof RawRelDataType) {
+ return ((RawRelDataType) relDataType).getRawType();
+ } else if (relDataType instanceof BitmapRelDataType) {
+ return ((BitmapRelDataType) relDataType).getBitmapType();
+ } else {
+ throw new TableException("Type is not supported: " +
relDataType);
+ }
+
+ default:
+ throw new TableException("Type is not supported: " +
relDataType);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalLocalRuntimeFilterBuilder.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalLocalRuntimeFilterBuilder.java
index d956f0a2990..2f8b61824b6 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalLocalRuntimeFilterBuilder.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalLocalRuntimeFilterBuilder.java
@@ -34,7 +34,6 @@ import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.type.RelDataType;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -81,8 +80,8 @@ public class BatchPhysicalLocalRuntimeFilterBuilder extends
SingleRel implements
protected RelDataType deriveRowType() {
return ((FlinkTypeFactory) getCluster().getTypeFactory())
.buildRelNodeRowType(
- Arrays.asList("actualRowCount", "filter"),
- Arrays.asList(new IntType(), new VarBinaryType()));
+ List.of("actualRowCount", "filter"),
+ List.of(new IntType(), new VarBinaryType()));
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
deleted file mode 100644
index e8e88833294..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++ /dev/null
@@ -1,723 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.calcite
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NothingTypeInfo,
TypeInformation}
-import org.apache.flink.table.api.{DataTypes, TableException,
ValidationException}
-import org.apache.flink.table.calcite.ExtendedRelTypeFactory
-import org.apache.flink.table.legacy.api.TableSchema
-import org.apache.flink.table.legacy.types.logical.TypeInformationRawType
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType
-import org.apache.flink.table.planner.plan.schema._
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
-import org.apache.flink.table.runtime.types.{LogicalTypeDataTypeConverter,
PlannerTypeUtils}
-import org.apache.flink.table.types.logical._
-import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-import org.apache.flink.table.utils.TableSchemaUtils
-import org.apache.flink.types.Nothing
-import org.apache.flink.util.Preconditions.checkArgument
-
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.`type`.{BasicSqlType, MapSqlType, SqlTypeName}
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.util.ConversionUtil
-
-import java.nio.charset.Charset
-import java.util
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * Flink specific type factory that represents the interface between Flink's
[[LogicalType]] and
- * Calcite's [[RelDataType]].
- */
-class FlinkTypeFactory(
- classLoader: ClassLoader,
- typeSystem: RelDataTypeSystem = FlinkTypeSystem.INSTANCE)
- extends JavaTypeFactoryImpl(typeSystem)
- with ExtendedRelTypeFactory {
-
- private val seenTypes = mutable.HashMap[LogicalType, RelDataType]()
-
- /**
- * Create a calcite field type in table schema from [[LogicalType]]. It use
PEEK_FIELDS_NO_EXPAND
- * when type is a nested struct type (Flink [[RowType]]).
- *
- * @param t
- * flink logical type.
- * @return
- * calcite [[RelDataType]].
- */
- def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = {
- def newRelDataType(): RelDataType = t.getTypeRoot match {
- case LogicalTypeRoot.NULL => createSqlType(NULL)
- case LogicalTypeRoot.BOOLEAN => createSqlType(BOOLEAN)
- case LogicalTypeRoot.TINYINT => createSqlType(TINYINT)
- case LogicalTypeRoot.SMALLINT => createSqlType(SMALLINT)
- case LogicalTypeRoot.INTEGER => createSqlType(INTEGER)
- case LogicalTypeRoot.BIGINT => createSqlType(BIGINT)
- case LogicalTypeRoot.FLOAT => createSqlType(FLOAT)
- case LogicalTypeRoot.DOUBLE => createSqlType(DOUBLE)
- case LogicalTypeRoot.VARCHAR => createSqlType(VARCHAR,
t.asInstanceOf[VarCharType].getLength)
- case LogicalTypeRoot.CHAR => createSqlType(CHAR,
t.asInstanceOf[CharType].getLength)
-
- // temporal types
- case LogicalTypeRoot.DATE => createSqlType(DATE)
- case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE =>
- createSqlType(TIME, t.asInstanceOf[TimeType].getPrecision)
-
- // interval types
- case LogicalTypeRoot.INTERVAL_YEAR_MONTH =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH,
SqlParserPos.ZERO))
- case LogicalTypeRoot.INTERVAL_DAY_TIME =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND,
SqlParserPos.ZERO))
-
- case LogicalTypeRoot.BINARY => createSqlType(BINARY,
t.asInstanceOf[BinaryType].getLength)
- case LogicalTypeRoot.VARBINARY =>
- createSqlType(VARBINARY, t.asInstanceOf[VarBinaryType].getLength)
-
- case LogicalTypeRoot.DECIMAL =>
- t match {
- case decimalType: DecimalType =>
- createSqlType(DECIMAL, decimalType.getPrecision,
decimalType.getScale)
- case legacyType: LegacyTypeInformationType[_]
- if legacyType.getTypeInformation ==
BasicTypeInfo.BIG_DEC_TYPE_INFO =>
- createSqlType(DECIMAL, 38, 18)
- }
-
- case LogicalTypeRoot.ROW =>
- val rowType = t.asInstanceOf[RowType]
- buildStructType(
- rowType.getFieldNames,
- rowType.getChildren,
- // fields are not expanded in "SELECT *"
- StructKind.PEEK_FIELDS_NO_EXPAND)
-
- case LogicalTypeRoot.STRUCTURED_TYPE =>
- t match {
- case structuredType: StructuredType =>
StructuredRelDataType.create(this, structuredType)
- case legacyTypeInformationType: LegacyTypeInformationType[_] =>
- createFieldTypeFromLogicalType(
- PlannerTypeUtils.removeLegacyTypes(legacyTypeInformationType))
- }
-
- case LogicalTypeRoot.ARRAY =>
- val arrayType = t.asInstanceOf[ArrayType]
-
createArrayType(createFieldTypeFromLogicalType(arrayType.getElementType), -1)
-
- case LogicalTypeRoot.MAP =>
- val mapType = t.asInstanceOf[MapType]
- createMapType(
- createFieldTypeFromLogicalType(mapType.getKeyType),
- createFieldTypeFromLogicalType(mapType.getValueType))
-
- case LogicalTypeRoot.MULTISET =>
- val multisetType = t.asInstanceOf[MultisetType]
-
createMultisetType(createFieldTypeFromLogicalType(multisetType.getElementType),
-1)
-
- case LogicalTypeRoot.RAW =>
- t match {
- case rawType: RawType[_] =>
- new RawRelDataType(rawType)
- case genericType: TypeInformationRawType[_] =>
- new GenericRelDataType(genericType, true, getTypeSystem)
- case legacyType: LegacyTypeInformationType[_] =>
-
createFieldTypeFromLogicalType(PlannerTypeUtils.removeLegacyTypes(legacyType))
- }
-
- case LogicalTypeRoot.SYMBOL =>
- createSqlType(SqlTypeName.SYMBOL)
-
- case LogicalTypeRoot.DESCRIPTOR =>
- createSqlType(SqlTypeName.COLUMN_LIST)
-
- case LogicalTypeRoot.VARIANT =>
- createSqlType(SqlTypeName.VARIANT)
-
- case LogicalTypeRoot.BITMAP =>
- new BitmapRelDataType(t.asInstanceOf[BitmapType])
-
- case _ @t =>
- throw new TableException(s"Type is not supported: $t")
- }
-
- // Kind in TimestampType do not affect the hashcode and equals, So we
can't put it to seenTypes
- val relType = t.getTypeRoot match {
- case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
- val timestampType = t.asInstanceOf[TimestampType]
- timestampType.getKind match {
- case TimestampKind.ROWTIME =>
createRowtimeIndicatorType(t.isNullable, false)
- case TimestampKind.REGULAR => createSqlType(TIMESTAMP,
timestampType.getPrecision)
- case TimestampKind.PROCTIME =>
- throw new TableException(
- s"Processing time indicator only supports" +
- s" LocalZonedTimestampType, but actual is TimestampType." +
- s" This is a bug in planner, please file an issue.")
- }
- case LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
- val lzTs = t.asInstanceOf[LocalZonedTimestampType]
- lzTs.getKind match {
- case TimestampKind.PROCTIME =>
createProctimeIndicatorType(t.isNullable)
- case TimestampKind.ROWTIME =>
createRowtimeIndicatorType(t.isNullable, true)
- case TimestampKind.REGULAR =>
- createSqlType(TIMESTAMP_WITH_LOCAL_TIME_ZONE, lzTs.getPrecision)
- }
- case _ =>
- seenTypes.get(t) match {
- case Some(retType: RelDataType) => retType
- case None =>
- val refType = newRelDataType()
- seenTypes.put(t, refType)
- refType
- }
- }
-
- createTypeWithNullability(relType, t.isNullable)
- }
-
- /** Creates a indicator type for processing-time, but with similar
properties as SQL timestamp. */
- def createProctimeIndicatorType(isNullable: Boolean): RelDataType = {
- val originalType = createFieldTypeFromLogicalType(new
LocalZonedTimestampType(isNullable, 3))
- canonize(
- new TimeIndicatorRelDataType(
- getTypeSystem,
- originalType.asInstanceOf[BasicSqlType],
- isNullable,
- isEventTime = false))
- }
-
- /** Creates a indicator type for event-time, but with similar properties as
SQL timestamp. */
- def createRowtimeIndicatorType(isNullable: Boolean, isTimestampLtz:
Boolean): RelDataType = {
- val originalType = if (isTimestampLtz) {
- createFieldTypeFromLogicalType(new LocalZonedTimestampType(isNullable,
3))
- } else {
- createFieldTypeFromLogicalType(new TimestampType(isNullable, 3))
- }
-
- canonize(
- new TimeIndicatorRelDataType(
- getTypeSystem,
- originalType.asInstanceOf[BasicSqlType],
- isNullable,
- isEventTime = true))
- }
-
- /**
- * Creates a struct type with the input fieldNames and input fieldTypes
using FlinkTypeFactory
- *
- * @param tableSchema
- * schema to convert to Calcite's specific one
- * @return
- * a struct type with the input fieldNames, input fieldTypes, and system
fields
- */
- def buildRelNodeRowType(tableSchema: TableSchema): RelDataType = {
- buildRelNodeRowType(
- tableSchema.getFieldNames,
- tableSchema.getFieldDataTypes.map(_.getLogicalType))
- }
-
- /**
- * Creates a table row type with the given field names and field types.
Table row type is table
- * schema for Calcite [[RelNode]]. See [[RelNode#getRowType]].
- *
- * It uses [[StructKind#FULLY_QUALIFIED]] to let each field must be
referenced explicitly.
- *
- * @param fieldNames
- * field names
- * @param fieldTypes
- * field types, every element is Flink's [[LogicalType]]
- * @return
- * a table row type with the input fieldNames, input fieldTypes.
- */
- def buildRelNodeRowType(
- fieldNames: util.List[String],
- fieldTypes: util.List[LogicalType]): RelDataType = {
- buildStructType(fieldNames, fieldTypes, StructKind.FULLY_QUALIFIED)
- }
-
- /**
- * Creates a table row type with the input fieldNames and input fieldTypes
using FlinkTypeFactory.
- * Table row type is table schema for Calcite RelNode. See getRowType of
[[RelNode]]. Use
- * FULLY_QUALIFIED to let each field must be referenced explicitly.
- *
- * @param fieldNames
- * field names
- * @param fieldTypes
- * field types, every element is Flink's [[LogicalType]]
- * @return
- * a table row type with the input fieldNames, input fieldTypes.
- */
- def buildRelNodeRowType(fieldNames: Seq[String], fieldTypes:
Seq[LogicalType]): RelDataType = {
- buildStructType(fieldNames, fieldTypes, StructKind.FULLY_QUALIFIED)
- }
-
- /**
- * Creates a table row type with the input fieldNames and input fieldTypes
using FlinkTypeFactory.
- * Table row type is table schema for Calcite RelNode. See getRowType of
[[RelNode]]. Use
- * FULLY_QUALIFIED to let each field must be referenced explicitly.
- */
- def buildRelNodeRowType(rowType: RowType): RelDataType = {
- val fields = rowType.getFields
- buildStructType(fields.map(_.getName), fields.map(_.getType),
StructKind.FULLY_QUALIFIED)
- }
-
- /**
- * Creates a struct type with the physical columns using FlinkTypeFactory
- *
- * @param tableSchema
- * schema to convert to Calcite's specific one
- * @return
- * a struct type with the input fieldNames, input fieldTypes.
- */
- def buildPhysicalRelNodeRowType(tableSchema: TableSchema): RelDataType = {
- buildRelNodeRowType(TableSchemaUtils.getPhysicalSchema(tableSchema))
- }
-
- /**
- * Creats a struct type with the persisted columns using FlinkTypeFactory
- *
- * @param tableSchema
- * schema to convert to Calcite's specific one
- * @return
- * a struct type with the input fieldsNames, input fieldTypes.
- */
- def buildPersistedRelNodeRowType(tableSchema: TableSchema): RelDataType = {
- buildRelNodeRowType(TableSchemaUtils.getPersistedSchema(tableSchema))
- }
-
- /**
- * Creates a struct type with the input fieldNames and input fieldTypes
using FlinkTypeFactory.
- *
- * @param fieldNames
- * field names
- * @param fieldTypes
- * field types, every element is Flink's [[LogicalType]].
- * @param structKind
- * Name resolution policy. See more information in [[StructKind]].
- * @return
- * a struct type with the input fieldNames, input fieldTypes.
- */
- private def buildStructType(
- fieldNames: Seq[String],
- fieldTypes: Seq[LogicalType],
- structKind: StructKind): RelDataType = {
- val b = builder
- b.kind(structKind)
- val fields = fieldNames.zip(fieldTypes)
- fields.foreach {
- case (fieldName, fieldType) =>
- val fieldRelDataType = createFieldTypeFromLogicalType(fieldType)
- checkForNullType(fieldRelDataType)
- b.add(fieldName, fieldRelDataType)
- }
- b.build
- }
-
- /** Returns a projected [[RelDataType]] of the structure type. */
- def projectStructType(relType: RelDataType, selectedFields: Array[Int]):
RelDataType = {
- this.createStructType(
- selectedFields
- .map(idx => relType.getFieldList.get(idx))
- .toList
- .asJava)
- }
-
- //
----------------------------------------------------------------------------------------------
-
- override def getJavaClass(`type`: RelDataType): java.lang.reflect.Type = {
- if (`type`.getSqlTypeName == FLOAT) {
- if (`type`.isNullable) {
- classOf[java.lang.Float]
- } else {
- java.lang.Float.TYPE
- }
- } else {
- super.getJavaClass(`type`)
- }
- }
-
- override def createSqlType(typeName: SqlTypeName, precision: Int):
RelDataType = {
- // it might happen that inferred VARCHAR types overflow as we set them to
Int.MaxValue
- // Calcite will limit the length of the VARCHAR type to 65536.
- if (typeName == VARCHAR && precision < 0) {
- createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
- } else {
- super.createSqlType(typeName, precision)
- }
- }
-
- override def createArrayType(elementType: RelDataType, maxCardinality:
Long): RelDataType = {
- // Just validate type, make sure there is a failure in validate phase.
- checkForNullType(elementType)
- toLogicalType(elementType)
- super.createArrayType(elementType, maxCardinality)
- }
-
- override def createMapType(keyType: RelDataType, valueType: RelDataType):
RelDataType = {
- // Just validate type, make sure there is a failure in validate phase.
- checkForNullType(keyType, valueType)
- toLogicalType(keyType)
- toLogicalType(valueType)
- super.createMapType(keyType, valueType)
- }
-
- override def createMultisetType(elementType: RelDataType, maxCardinality:
Long): RelDataType = {
- // Just validate type, make sure there is a failure in validate phase.
- checkForNullType(elementType)
- toLogicalType(elementType)
- super.createMultisetType(elementType, maxCardinality)
- }
-
- override def createRawType(className: String, serializerString: String):
RelDataType = {
- val rawType = RawType.restore(classLoader, className, serializerString)
- val rawRelDataType = createFieldTypeFromLogicalType(rawType)
- canonize(rawRelDataType)
- }
-
- override def createStructuredType(
- className: String,
- fieldTypes: util.List[RelDataType],
- fieldNames: util.List[String]): RelDataType = {
- val resolvedClass = toScala(StructuredType.resolveClass(classLoader,
className))
- val builder = resolvedClass
- .map(StructuredType.newBuilder)
- .getOrElse(StructuredType.newBuilder(className))
-
- val relFields = 0
- .until(fieldTypes.size())
- .map(i => new RelDataTypeFieldImpl(fieldNames.get(i), i,
fieldTypes.get(i)))
- .map(_.asInstanceOf[RelDataTypeField])
- .toList
-
- val attributes =
- relFields.map(f => new StructuredAttribute(f.getName,
toLogicalType(f.getType)))
- builder.attributes(attributes.asJava)
-
- val relDataType = new StructuredRelDataType(builder.build(),
relFields.asJava)
- canonize(relDataType)
- }
-
- override def createBitmapType(): RelDataType = {
- canonize(new BitmapRelDataType(new BitmapType()))
- }
-
- override def createSqlType(typeName: SqlTypeName): RelDataType = {
- if (typeName == DECIMAL) {
- // if we got here, the precision and scale are not specified, here we
- // keep precision/scale in sync with our type system's default value,
- // see DecimalType.USER_DEFAULT.
- createSqlType(typeName, DecimalType.DEFAULT_PRECISION,
DecimalType.DEFAULT_SCALE)
- } else {
- super.createSqlType(typeName)
- }
- }
-
- override def createTypeWithNullability(
- relDataType: RelDataType,
- isNullable: Boolean): RelDataType = {
-
- // nullability change not necessary
- if (relDataType.isNullable == isNullable) {
- return canonize(relDataType)
- }
-
- // change nullability
- val newType = relDataType match {
- case raw: RawRelDataType =>
- raw.createWithNullability(isNullable)
-
- case structured: StructuredRelDataType =>
- structured.createWithNullability(isNullable)
-
- case bitmap: BitmapRelDataType =>
- bitmap.createWithNullability(isNullable)
-
- case generic: GenericRelDataType =>
- new GenericRelDataType(generic.genericType, isNullable, typeSystem)
-
- case it: TimeIndicatorRelDataType =>
- new TimeIndicatorRelDataType(
- it.typeSystemField,
- it.originalType,
- isNullable,
- it.isEventTime)
-
- // for nested rows we keep the nullability property,
- // top-level rows fall back to Calcite's default handling
- case rt: RelRecordType if rt.getStructKind ==
StructKind.PEEK_FIELDS_NO_EXPAND =>
- new RelRecordType(rt.getStructKind, rt.getFieldList, isNullable);
-
- case _ =>
- super.createTypeWithNullability(relDataType, isNullable)
- }
-
- canonize(newType)
- }
-
- override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
- val leastRestrictive = resolveAllIdenticalTypes(types)
- .getOrElse(super.leastRestrictive(types))
- // NULL is reserved for untyped literals only
- if (leastRestrictive == null || leastRestrictive.getSqlTypeName == NULL) {
- null
- } else {
- leastRestrictive
- }
- }
-
- private def resolveAllIdenticalTypes(types: util.List[RelDataType]):
Option[RelDataType] = {
- val allTypes = types.asScala
-
- val head = allTypes.head
- // check if all types are the same
- if (allTypes.forall(_ == head)) {
- // types are the same, check nullability
- val nullable = allTypes
- .exists(sqlType => sqlType.isNullable || sqlType.getSqlTypeName ==
SqlTypeName.NULL)
- // return type with nullability
- Some(createTypeWithNullability(head, nullable))
- } else {
- // types are not all the same
- if (allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)) {
- // one of the type was RAW.
- // we cannot generate a common type if it differs from other types.
- throw new TableException("Generic RAW types must have a common type
information.")
- } else {
- // cannot resolve a common type for different input types
- None
- }
- }
- }
-
- override def getDefaultCharset: Charset = {
- Charset.forName(ConversionUtil.NATIVE_UTF16_CHARSET_NAME)
- }
-
- /**
- * This is a safety check in case the null type ends up in the type factory
for other use cases
- * than untyped NULL literals.
- */
- private def checkForNullType(childTypes: RelDataType*): Unit = {
- childTypes.foreach {
- t =>
- if (t.getSqlTypeName == NULL) {
- throw new ValidationException(
- "The null type is reserved for representing untyped NULL literals.
It should not be " +
- "used in constructed types. Please cast NULL literals to a more
explicit type.")
- }
- }
- }
-}
-
-object FlinkTypeFactory {
-
- def isTimeIndicatorType(t: LogicalType): Boolean = t match {
- case t: TimestampType if t.getKind == TimestampKind.ROWTIME => true
- case ltz: LocalZonedTimestampType if ltz.getKind == TimestampKind.PROCTIME
=> true
- case _ => false
- }
-
- def isTimeIndicatorType(relDataType: RelDataType): Boolean = relDataType
match {
- case _: TimeIndicatorRelDataType => true
- case _ => false
- }
-
- def isRowtimeIndicatorType(relDataType: RelDataType): Boolean = relDataType
match {
- case ti: TimeIndicatorRelDataType if ti.isEventTime => true
- case _ => false
- }
-
- def isProctimeIndicatorType(relDataType: RelDataType): Boolean = relDataType
match {
- case ti: TimeIndicatorRelDataType if !ti.isEventTime => true
- case _ => false
- }
-
- def isTimestampLtzIndicatorType(relDataType: RelDataType): Boolean =
- relDataType match {
- case ti: TimeIndicatorRelDataType
- if
ti.originalType.getSqlTypeName.equals(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
=>
- true
- case _ => false
- }
-
- @Deprecated
- def isProctimeIndicatorType(typeInfo: TypeInformation[_]): Boolean =
typeInfo match {
- case ti: TimeIndicatorTypeInfo if !ti.isEventTime => true
- case _ => false
- }
-
- @Deprecated
- def isRowtimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo
match {
- case ti: TimeIndicatorTypeInfo if ti.isEventTime => true
- case _ => false
- }
-
- @Deprecated
- def isTimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo
match {
- case ti: TimeIndicatorTypeInfo => true
- case _ => false
- }
-
- def toLogicalType(relDataType: RelDataType): LogicalType = {
- val logicalType = relDataType.getSqlTypeName match {
- case BOOLEAN => new BooleanType()
- case TINYINT => new TinyIntType()
- case SMALLINT => new SmallIntType()
- case INTEGER => new IntType()
- case BIGINT => new BigIntType()
- case FLOAT => new FloatType()
- case DOUBLE => new DoubleType()
- case CHAR =>
- if (relDataType.getPrecision == 0) {
- CharType.ofEmptyLiteral
- } else {
- new CharType(relDataType.getPrecision)
- }
- case VARCHAR =>
- if (relDataType.getPrecision == 0) {
- VarCharType.ofEmptyLiteral
- } else {
- new VarCharType(relDataType.getPrecision)
- }
- case BINARY =>
- if (relDataType.getPrecision == 0) {
- BinaryType.ofEmptyLiteral
- } else {
- new BinaryType(relDataType.getPrecision)
- }
- case VARBINARY =>
- if (relDataType.getPrecision == 0) {
- VarBinaryType.ofEmptyLiteral
- } else {
- new VarBinaryType(relDataType.getPrecision)
- }
- case DECIMAL => new DecimalType(relDataType.getPrecision,
relDataType.getScale)
-
- // time indicators
- case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
- val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
- if (indicator.isEventTime) {
- new TimestampType(true, TimestampKind.ROWTIME, 3)
- } else {
- throw new TableException(
- s"Processing time indicator only supports" +
- s" LocalZonedTimestampType, but actual is TimestampType." +
- s" This is a bug in planner, please file an issue.")
- }
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE if
relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
- val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
- if (indicator.isEventTime) {
- new LocalZonedTimestampType(true, TimestampKind.ROWTIME, 3)
- } else {
- new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3)
- }
-
- // temporal types
- case DATE => new DateType()
- case TIME =>
- new TimeType(relDataType.getPrecision)
- case TIMESTAMP =>
- new TimestampType(relDataType.getPrecision)
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
- new LocalZonedTimestampType(relDataType.getPrecision)
- case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
- DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType
- case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
- if (relDataType.getPrecision > 3) {
- throw new TableException(
- s"DAY_INTERVAL_TYPES precision is not supported:
${relDataType.getPrecision}")
- }
- DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType
-
- case NULL =>
- new NullType()
-
- case SYMBOL =>
- new SymbolType()
-
- case COLUMN_LIST =>
- new DescriptorType()
-
- // extract encapsulated Type
- case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
- val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
- genericRelDataType.genericType
-
- case ROW if relDataType.isInstanceOf[RelRecordType] =>
- toLogicalRowType(relDataType)
-
- case STRUCTURED if relDataType.isInstanceOf[StructuredRelDataType] =>
- relDataType.asInstanceOf[StructuredRelDataType].getStructuredType
-
- case MULTISET => new
MultisetType(toLogicalType(relDataType.getComponentType))
-
- case ARRAY => new ArrayType(toLogicalType(relDataType.getComponentType))
-
- case MAP if relDataType.isInstanceOf[MapSqlType] =>
- val mapRelDataType = relDataType.asInstanceOf[MapSqlType]
- new MapType(
- toLogicalType(mapRelDataType.getKeyType),
- toLogicalType(mapRelDataType.getValueType))
-
- // CURSOR for UDTF case, whose type info will never be used, just a
placeholder
- case CURSOR => new TypeInformationRawType[Nothing](new NothingTypeInfo)
-
- case VARIANT => new VariantType()
-
- case OTHER if relDataType.isInstanceOf[RawRelDataType] =>
- relDataType.asInstanceOf[RawRelDataType].getRawType
-
- case OTHER if relDataType.isInstanceOf[BitmapRelDataType] =>
- relDataType.asInstanceOf[BitmapRelDataType].getBitmapType
-
- case _ @t =>
- throw new TableException(s"Type is not supported: $t")
- }
- logicalType.copy(relDataType.isNullable)
- }
-
- def toTableSchema(relDataType: RelDataType): TableSchema = {
- val fieldNames = relDataType.getFieldNames.toArray(new Array[String](0))
- val fieldTypes = relDataType.getFieldList.asScala
- .map(
- field =>
- LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
- FlinkTypeFactory.toLogicalType(field.getType)))
- .toArray
- TableSchema.builder.fields(fieldNames, fieldTypes).build
- }
-
- def toLogicalRowType(relType: RelDataType): RowType = {
- checkArgument(relType.isStruct)
- RowType.of(
- relType.getFieldList.asScala
- .map(fieldType => toLogicalType(fieldType.getType))
- .toArray[LogicalType],
- relType.getFieldNames.asScala.toArray)
- }
-}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
index fb26a6f9af5..867834b32e5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.PlannerMocks;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
@@ -76,23 +75,15 @@ public class AsyncCodeGeneratorTest {
.getPlannerContext()
.getTypeFactory()
.buildRelNodeRowType(
-
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
- JavaScalaConversionUtil.toScala(
- Arrays.asList(
- new IntType(),
- new BigIntType(),
- new VarCharType())));
+ List.of("f1", "f2", "f3"),
+ List.of(new IntType(), new BigIntType(), new
VarCharType()));
tableRowType2 =
plannerMocks
.getPlannerContext()
.getTypeFactory()
.buildRelNodeRowType(
-
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
- JavaScalaConversionUtil.toScala(
- Arrays.asList(
- new VarCharType(),
- new VarCharType(),
- new VarCharType())));
+ List.of("f1", "f2", "f3"),
+ List.of(new VarCharType(), new VarCharType(),
new VarCharType()));
ShortcutUtils.unwrapContext(plannerMocks.getPlanner().createToRelContext().getCluster());
converter =
ShortcutUtils.unwrapContext(
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java
index 18daf6819ab..db5f7e6ec07 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.PlannerMocks;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
@@ -73,12 +72,8 @@ public class AsyncCorrelateCodeGeneratorTest {
.getPlannerContext()
.getTypeFactory()
.buildRelNodeRowType(
-
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
- JavaScalaConversionUtil.toScala(
- Arrays.asList(
- new IntType(),
- new BigIntType(),
- new VarCharType())));
+ List.of("f1", "f2", "f3"),
+ List.of(new IntType(), new BigIntType(), new
VarCharType()));
ShortcutUtils.unwrapContext(plannerMocks.getPlanner().createToRelContext().getCluster());
plannerMocks
.getFunctionCatalog()
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/AsyncUtilTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/AsyncUtilTest.java
index c18330d73d8..f099952a58d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/AsyncUtilTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/AsyncUtilTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.PlannerMocks;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.types.logical.BigIntType;
@@ -36,8 +35,8 @@ import org.apache.calcite.rex.RexNode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import static
org.apache.flink.table.planner.plan.utils.AsyncUtil.containsAsyncCall;
@@ -61,12 +60,8 @@ public class AsyncUtilTest {
.getPlannerContext()
.getTypeFactory()
.buildRelNodeRowType(
-
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
- JavaScalaConversionUtil.toScala(
- Arrays.asList(
- new IntType(),
- new BigIntType(),
- new VarCharType())));
+ List.of("f1", "f2", "f3"),
+ List.of(new IntType(), new BigIntType(), new
VarCharType()));
converter =
ShortcutUtils.unwrapContext(
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
index eabc29b01ee..ed7f02d040c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -132,7 +132,7 @@ class AggCallSelectivityEstimatorTest {
private def createInputRef(index: Int): RexInputRef = {
val relDataType = typeFactory.createSqlType(allFieldTypes(index))
val relDataTypeWithNullability =
- typeFactory.createTypeWithNullability(relDataType, isNullable = false)
+ typeFactory.createTypeWithNullability(relDataType, false)
rexBuilder.makeInputRef(relDataTypeWithNullability, index)
}