This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ff839d0d6 [FLINK-39812][table] Migrate `FlinkTypeFactory` to java
50ff839d0d6 is described below

commit 50ff839d0d681160797468bbcf2bf0c917dd882a
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Jun 1 23:06:25 2026 +0200

    [FLINK-39812][table] Migrate `FlinkTypeFactory` to java
---
 .../table/planner/calcite/FlinkTypeFactory.java    | 877 +++++++++++++++++++++
 .../BatchPhysicalLocalRuntimeFilterBuilder.java    |   5 +-
 .../table/planner/calcite/FlinkTypeFactory.scala   | 723 -----------------
 .../planner/codegen/AsyncCodeGeneratorTest.java    |  17 +-
 .../codegen/AsyncCorrelateCodeGeneratorTest.java   |   9 +-
 .../table/planner/plan/utils/AsyncUtilTest.java    |  11 +-
 .../metadata/AggCallSelectivityEstimatorTest.scala |   2 +-
 7 files changed, 889 insertions(+), 755 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeFactory.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeFactory.java
new file mode 100644
index 00000000000..fe18a71410e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeFactory.java
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.calcite.ExtendedRelTypeFactory;
+import org.apache.flink.table.legacy.api.TableSchema;
+import org.apache.flink.table.legacy.types.logical.TypeInformationRawType;
+import org.apache.flink.table.planner.plan.schema.BitmapRelDataType;
+import org.apache.flink.table.planner.plan.schema.GenericRelDataType;
+import org.apache.flink.table.planner.plan.schema.RawRelDataType;
+import org.apache.flink.table.planner.plan.schema.StructuredRelDataType;
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.runtime.types.PlannerTypeUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BitmapType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DescriptorType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VariantType;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.MapSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ConversionUtil;
+
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
+import static org.apache.calcite.sql.type.SqlTypeName.TIMESTAMP;
+import static 
org.apache.calcite.sql.type.SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.calcite.sql.type.SqlTypeName.VARBINARY;
+
+/**
+ * Flink specific type factory that represents the interface between Flink's 
{@link LogicalType} and
+ * Calcite's {@link RelDataType}.
+ */
+public class FlinkTypeFactory extends JavaTypeFactoryImpl implements 
ExtendedRelTypeFactory {
+    private final Map<LogicalType, RelDataType> seenTypes = new HashMap<>();
+    private final ClassLoader classLoader;
+
+    public FlinkTypeFactory(ClassLoader classLoader, RelDataTypeSystem 
typeSystem) {
+        super(typeSystem);
+        this.classLoader = classLoader;
+    }
+
+    public FlinkTypeFactory(ClassLoader classLoader) {
+        this(classLoader, FlinkTypeSystem.INSTANCE);
+    }
+
+    @Override
+    public RelDataType createRawType(String className, String 
serializerString) {
+        final RawType rawType = RawType.restore(classLoader, className, 
serializerString);
+        final RelDataType rawRelDataType = 
createFieldTypeFromLogicalType(rawType);
+        return canonize(rawRelDataType);
+    }
+
+    @Override
+    public RelDataType createStructuredType(
+            String className, List<RelDataType> fieldTypes, List<String> 
fieldNames) {
+        final Optional<Class<?>> resolvedClass =
+                StructuredType.resolveClass(classLoader, className);
+        final StructuredType.Builder builder =
+                resolvedClass
+                        .map(StructuredType::newBuilder)
+                        .orElseGet(() -> StructuredType.newBuilder(className));
+
+        final List<RelDataTypeField> relFields = new ArrayList<>();
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            relFields.add(new RelDataTypeFieldImpl(fieldNames.get(i), i, 
fieldTypes.get(i)));
+        }
+
+        builder.attributes(
+                relFields.stream()
+                        .map(
+                                f ->
+                                        new StructuredType.StructuredAttribute(
+                                                f.getName(), 
toLogicalType(f.getType())))
+                        .collect(Collectors.toList()));
+
+        final RelDataType relDataType = new 
StructuredRelDataType(builder.build(), relFields);
+        return canonize(relDataType);
+    }
+
+    @Override
+    public RelDataType createBitmapType() {
+        return canonize(new BitmapRelDataType(new BitmapType()));
+    }
+
+    @Override
+    public RelDataType createArrayType(RelDataType elementType, long 
maxCardinality) {
+        // Just validate type, make sure there is a failure in validate phase.
+        checkForNullType(elementType);
+        toLogicalType(elementType);
+        return super.createArrayType(elementType, maxCardinality);
+    }
+
+    @Override
+    public RelDataType createMapType(RelDataType keyType, RelDataType 
valueType) {
+        // Just validate type, make sure there is a failure in validate phase.
+        checkForNullType(keyType, valueType);
+        toLogicalType(keyType);
+        toLogicalType(valueType);
+        return super.createMapType(keyType, valueType);
+    }
+
+    @Override
+    public RelDataType createMultisetType(RelDataType elementType, long 
maxCardinality) {
+        // Just validate type, make sure there is a failure in validate phase.
+        checkForNullType(elementType);
+        toLogicalType(elementType);
+        return super.createMultisetType(elementType, maxCardinality);
+    }
+
+    @Override
+    public Type getJavaClass(RelDataType type) {
+        if (type.getSqlTypeName() == SqlTypeName.FLOAT) {
+            return type.isNullable() ? Float.class : Float.TYPE;
+        }
+        return super.getJavaClass(type);
+    }
+
+    @Override
+    public RelDataType createSqlType(SqlTypeName typeName) {
+        if (typeName == SqlTypeName.DECIMAL) {
+            // if we got here, the precision and scale are not specified, here 
we
+            // keep precision/scale in sync with our type system's default 
value,
+            // see DecimalType.USER_DEFAULT.
+            return createSqlType(
+                    typeName, DecimalType.DEFAULT_PRECISION, 
DecimalType.DEFAULT_SCALE);
+        }
+        return super.createSqlType(typeName);
+    }
+
+    @Override
+    public RelDataType createSqlType(SqlTypeName typeName, int precision) {
+        // it might happen that inferred VARCHAR types overflow as we set them 
to Int.MaxValue
+        // Calcite will limit the length of the VARCHAR type to 65536.
+        if (typeName == SqlTypeName.VARCHAR && precision < 0) {
+            return createSqlType(typeName, 
getTypeSystem().getDefaultPrecision(typeName));
+        }
+        return super.createSqlType(typeName, precision);
+    }
+
+    @Override
+    public RelDataType createTypeWithNullability(RelDataType relDataType, 
boolean isNullable) {
+        // nullability change not necessary
+        if (relDataType.isNullable() == isNullable) {
+            return canonize(relDataType);
+        }
+
+        // change nullability
+        final RelDataType newType;
+        if (relDataType instanceof RawRelDataType) {
+            newType = ((RawRelDataType) 
relDataType).createWithNullability(isNullable);
+        } else if (relDataType instanceof StructuredRelDataType) {
+            newType = ((StructuredRelDataType) 
relDataType).createWithNullability(isNullable);
+        } else if (relDataType instanceof BitmapRelDataType) {
+            newType = ((BitmapRelDataType) 
relDataType).createWithNullability(isNullable);
+        } else if (relDataType instanceof GenericRelDataType) {
+            final GenericRelDataType generic = (GenericRelDataType) 
relDataType;
+            newType = new GenericRelDataType(generic.genericType(), 
isNullable, getTypeSystem());
+        } else if (relDataType instanceof TimeIndicatorRelDataType) {
+            final TimeIndicatorRelDataType it = (TimeIndicatorRelDataType) 
relDataType;
+            newType =
+                    new TimeIndicatorRelDataType(
+                            it.typeSystemField(), it.originalType(), 
isNullable, it.isEventTime());
+        } else if (relDataType instanceof RelRecordType
+                && ((RelRecordType) relDataType).getStructKind()
+                        == StructKind.PEEK_FIELDS_NO_EXPAND) {
+            // for nested rows we keep the nullability property,
+            // top-level rows fall back to Calcite's default handling
+            final RelRecordType rt = (RelRecordType) relDataType;
+            newType = new RelRecordType(rt.getStructKind(), rt.getFieldList(), 
isNullable);
+        } else {
+            newType = super.createTypeWithNullability(relDataType, isNullable);
+        }
+
+        return canonize(newType);
+    }
+
+    @Override
+    public RelDataType leastRestrictive(List<RelDataType> types) {
+        final Optional<RelDataType> resolved = resolveAllIdenticalTypes(types);
+        final RelDataType leastRestrictive =
+                resolved.orElseGet(() -> super.leastRestrictive(types));
+        // NULL is reserved for untyped literals only
+        if (leastRestrictive == null || leastRestrictive.getSqlTypeName() == 
SqlTypeName.NULL) {
+            return null;
+        }
+        return leastRestrictive;
+    }
+
+    private Optional<RelDataType> resolveAllIdenticalTypes(List<RelDataType> 
types) {
+        final RelDataType head = types.get(0);
+        // check if all types are the same
+        if (types.stream().allMatch(t -> t.equals(head))) {
+            // types are the same, check nullability
+            final boolean nullable =
+                    types.stream()
+                            .anyMatch(
+                                    sqlType ->
+                                            sqlType.isNullable()
+                                                    || sqlType.getSqlTypeName()
+                                                            == 
SqlTypeName.NULL);
+            // return type with nullability
+            return Optional.of(createTypeWithNullability(head, nullable));
+        } else {
+            // types are not all the same
+            if (types.stream().anyMatch(t -> t.getSqlTypeName() == 
SqlTypeName.ANY)) {
+                // one of the type was RAW.
+                // we cannot generate a common type if it differs from other 
types.
+                throw new TableException("Generic RAW types must have a common 
type information.");
+            } else {
+                // cannot resolve a common type for different input types
+                return Optional.empty();
+            }
+        }
+    }
+
+    @Override
+    public Charset getDefaultCharset() {
+        return Charset.forName(ConversionUtil.NATIVE_UTF16_CHARSET_NAME);
+    }
+
+    /**
+     * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory.
+     *
+     * @param fieldNames field names
+     * @param fieldTypes field types, every element is Flink's {@link 
LogicalType}.
+     * @param structKind Name resolution policy. See more information in 
{@link StructKind}.
+     * @return a struct type with the input fieldNames, input fieldTypes.
+     */
+    private RelDataType buildStructType(
+            String[] fieldNames, LogicalType[] fieldTypes, StructKind 
structKind) {
+        RelDataTypeFactory.Builder b = builder();
+        b.kind(structKind);
+        for (int i = 0; i < fieldTypes.length; i++) {
+            final LogicalType fieldType = fieldTypes[i];
+            final String fieldName = fieldNames[i];
+            final RelDataType fieldRelDataType = 
createFieldTypeFromLogicalType(fieldType);
+            checkForNullType(fieldRelDataType);
+            b.add(fieldName, fieldRelDataType);
+        }
+        return b.build();
+    }
+
+    /**
+     * Create a calcite field type in table schema from {@link LogicalType}. 
It uses
+     * PEEK_FIELDS_NO_EXPAND when type is a nested struct type (Flink {@link 
RowType}).
+     *
+     * @param type Flink logical type.
+     * @return calcite {@link RelDataType}.
+     */
+    public RelDataType createFieldTypeFromLogicalType(LogicalType type) {
+
+        // Kind in TimestampType do not affect the hashcode and equals, So we 
can't put it to
+        // seenTypes
+        final RelDataType relType;
+        switch (type.getTypeRoot()) {
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                final TimestampType timestampType = (TimestampType) type;
+                switch (timestampType.getKind()) {
+                    case ROWTIME:
+                        relType = 
createRowtimeIndicatorType(type.isNullable(), false);
+                        break;
+                    case REGULAR:
+                        relType = createSqlType(TIMESTAMP, 
timestampType.getPrecision());
+                        break;
+                    case PROCTIME:
+                        throw new TableException(
+                                "Processing time indicator only supports"
+                                        + " LocalZonedTimestampType, but 
actual is TimestampType."
+                                        + " This is a bug in planner, please 
file an issue.");
+                    default:
+                        throw new TableException(
+                                "Unsupported TimestampKind: " + 
timestampType.getKind());
+                }
+                break;
+
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final LocalZonedTimestampType lzTs = (LocalZonedTimestampType) 
type;
+                switch (lzTs.getKind()) {
+                    case PROCTIME:
+                        relType = 
createProctimeIndicatorType(type.isNullable());
+                        break;
+                    case ROWTIME:
+                        relType = 
createRowtimeIndicatorType(type.isNullable(), true);
+                        break;
+                    case REGULAR:
+                        relType =
+                                createSqlType(TIMESTAMP_WITH_LOCAL_TIME_ZONE, 
lzTs.getPrecision());
+                        break;
+                    default:
+                        throw new TableException("Unsupported TimestampKind: " 
+ lzTs.getKind());
+                }
+                break;
+            default:
+                final RelDataType seenType = seenTypes.get(type);
+                if (seenType != null) {
+                    relType = seenType;
+                } else {
+                    final RelDataType refType = newRelDataType(type);
+                    seenTypes.put(type, refType);
+                    relType = refType;
+                }
+        }
+
+        return createTypeWithNullability(relType, type.isNullable());
+    }
+
+    private RelDataType newRelDataType(LogicalType logicalType) {
+        switch (logicalType.getTypeRoot()) {
+            case NULL:
+                return createSqlType(SqlTypeName.NULL);
+            case BOOLEAN:
+                return createSqlType(SqlTypeName.BOOLEAN);
+            case TINYINT:
+                return createSqlType(SqlTypeName.TINYINT);
+            case SMALLINT:
+                return createSqlType(SqlTypeName.SMALLINT);
+            case INTEGER:
+                return createSqlType(SqlTypeName.INTEGER);
+            case BIGINT:
+                return createSqlType(SqlTypeName.BIGINT);
+            case FLOAT:
+                return createSqlType(SqlTypeName.FLOAT);
+            case DOUBLE:
+                return createSqlType(SqlTypeName.DOUBLE);
+            case VARCHAR:
+                return createSqlType(SqlTypeName.VARCHAR, ((VarCharType) 
logicalType).getLength());
+            case CHAR:
+                return createSqlType(SqlTypeName.CHAR, ((CharType) 
logicalType).getLength());
+
+            // temporal types
+            case DATE:
+                return createSqlType(SqlTypeName.DATE);
+            case TIME_WITHOUT_TIME_ZONE:
+                return createSqlType(SqlTypeName.TIME, ((TimeType) 
logicalType).getPrecision());
+
+            // interval types
+            case INTERVAL_YEAR_MONTH:
+                return createSqlIntervalType(
+                        new SqlIntervalQualifier(TimeUnit.YEAR, 
TimeUnit.MONTH, SqlParserPos.ZERO));
+            case INTERVAL_DAY_TIME:
+                return createSqlIntervalType(
+                        new SqlIntervalQualifier(TimeUnit.DAY, 
TimeUnit.SECOND, SqlParserPos.ZERO));
+
+            case BINARY:
+                return createSqlType(SqlTypeName.BINARY, ((BinaryType) 
logicalType).getLength());
+            case VARBINARY:
+                return createSqlType(VARBINARY, ((VarBinaryType) 
logicalType).getLength());
+
+            case DECIMAL:
+                if (logicalType instanceof DecimalType) {
+                    DecimalType decimalType = (DecimalType) logicalType;
+                    return createSqlType(
+                            DECIMAL, decimalType.getPrecision(), 
decimalType.getScale());
+                } else if (logicalType instanceof LegacyTypeInformationType) {
+                    LegacyTypeInformationType<?> legacyType =
+                            (LegacyTypeInformationType<?>) logicalType;
+                    if (legacyType.getTypeInformation() == 
BasicTypeInfo.BIG_DEC_TYPE_INFO) {
+                        return createSqlType(DECIMAL, 38, 18);
+                    }
+                }
+                throw new TableException("Type is not supported: " + 
logicalType);
+            case ROW:
+                final RowType rowType = (RowType) logicalType;
+                return buildStructType(
+                        rowType.getFieldNames().toArray(new String[0]),
+                        rowType.getChildren().toArray(new LogicalType[0]),
+                        // fields are not expanded in "SELECT *"
+                        StructKind.PEEK_FIELDS_NO_EXPAND);
+
+            case STRUCTURED_TYPE:
+                if (logicalType instanceof StructuredType) {
+                    return StructuredRelDataType.create(this, (StructuredType) 
logicalType);
+                } else if (logicalType instanceof LegacyTypeInformationType) {
+                    return createFieldTypeFromLogicalType(
+                            PlannerTypeUtils.removeLegacyTypes(logicalType));
+                }
+                throw new TableException("Type is not supported: " + 
logicalType);
+            case ARRAY:
+                final ArrayType arrayType = (ArrayType) logicalType;
+                return createArrayType(
+                        
createFieldTypeFromLogicalType(arrayType.getElementType()), -1);
+
+            case MAP:
+                final MapType mapType = (MapType) logicalType;
+                return createMapType(
+                        createFieldTypeFromLogicalType(mapType.getKeyType()),
+                        
createFieldTypeFromLogicalType(mapType.getValueType()));
+
+            case MULTISET:
+                final MultisetType multisetType = (MultisetType) logicalType;
+                return createMultisetType(
+                        
createFieldTypeFromLogicalType(multisetType.getElementType()), -1);
+
+            case RAW:
+                if (logicalType instanceof RawType) {
+                    return new RawRelDataType((RawType<?>) logicalType);
+                } else if (logicalType instanceof TypeInformationRawType) {
+                    return new GenericRelDataType(
+                            (TypeInformationRawType<?>) logicalType, true, 
getTypeSystem());
+                } else if (logicalType instanceof LegacyTypeInformationType) {
+                    return createFieldTypeFromLogicalType(
+                            PlannerTypeUtils.removeLegacyTypes(logicalType));
+                }
+                throw new TableException("Type is not supported: " + 
logicalType);
+
+            case SYMBOL:
+                return createSqlType(SqlTypeName.SYMBOL);
+
+            case DESCRIPTOR:
+                return createSqlType(SqlTypeName.COLUMN_LIST);
+
+            case VARIANT:
+                return createSqlType(SqlTypeName.VARIANT);
+
+            case BITMAP:
+                return new BitmapRelDataType((BitmapType) logicalType);
+
+            default:
+                throw new TableException("Type is not supported: " + 
logicalType);
+        }
+    }
+
+    /**
+     * This is a safety check in case the null type ends up in the type 
factory for other use cases
+     * than untyped NULL literals.
+     */
+    private void checkForNullType(RelDataType... childTypes) {
+        for (RelDataType childType : childTypes) {
+
+            if (childType.getSqlTypeName() == SqlTypeName.NULL) {
+                throw new ValidationException(
+                        "The null type is reserved for representing untyped 
NULL literals. It should not be "
+                                + "used in constructed types. Please cast NULL 
literals to a more explicit type.");
+            }
+        }
+    }
+
+    /**
+     * Creates a indicator type for processing-time, but with similar 
properties as SQL timestamp.
+     */
+    public RelDataType createProctimeIndicatorType(boolean isNullable) {
+        final RelDataType originalType =
+                createFieldTypeFromLogicalType(new 
LocalZonedTimestampType(isNullable, 3));
+        return canonize(
+                new TimeIndicatorRelDataType(
+                        getTypeSystem(), (BasicSqlType) originalType, 
isNullable, false));
+    }
+
+    /** Creates a indicator type for event-time, but with similar properties 
as SQL timestamp. */
+    public RelDataType createRowtimeIndicatorType(boolean isNullable, boolean 
isTimestampLtz) {
+        final RelDataType originalType =
+                (isTimestampLtz)
+                        ? createFieldTypeFromLogicalType(new 
LocalZonedTimestampType(isNullable, 3))
+                        : createFieldTypeFromLogicalType(new 
TimestampType(isNullable, 3));
+
+        return canonize(
+                new TimeIndicatorRelDataType(
+                        getTypeSystem(), (BasicSqlType) originalType, 
isNullable, true));
+    }
+
+    public static RowType toLogicalRowType(RelDataType relType) {
+        Preconditions.checkArgument(relType.isStruct());
+        return RowType.of(
+                relType.getFieldList().stream()
+                        .map(fieldType -> toLogicalType(fieldType.getType()))
+                        .collect(Collectors.toList())
+                        .toArray(new LogicalType[0]),
+                relType.getFieldNames().toArray(new String[0]));
+    }
+
+    public static boolean isTimeIndicatorType(LogicalType logicalType) {
+        return logicalType instanceof TimestampType
+                        && ((TimestampType) logicalType).getKind() == 
TimestampKind.ROWTIME
+                || logicalType instanceof LocalZonedTimestampType
+                        && ((LocalZonedTimestampType) logicalType).getKind()
+                                == TimestampKind.PROCTIME;
+    }
+
+    public static boolean isTimeIndicatorType(RelDataType relDataType) {
+        return relDataType instanceof TimeIndicatorRelDataType;
+    }
+
+    public static boolean isRowtimeIndicatorType(RelDataType relDataType) {
+        return relDataType instanceof TimeIndicatorRelDataType
+                && ((TimeIndicatorRelDataType) relDataType).isEventTime();
+    }
+
+    public static boolean isProctimeIndicatorType(RelDataType relDataType) {
+        return relDataType instanceof TimeIndicatorRelDataType
+                && !((TimeIndicatorRelDataType) relDataType).isEventTime();
+    }
+
+    public static boolean isTimestampLtzIndicatorType(RelDataType relDataType) 
{
+        return relDataType instanceof TimeIndicatorRelDataType
+                && ((TimeIndicatorRelDataType) relDataType)
+                        .originalType()
+                        .getSqlTypeName()
+                        .equals(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+    }
+
+    @Deprecated
+    public static boolean isProctimeIndicatorType(TypeInformation<?> typeInfo) 
{
+        return typeInfo instanceof TimeIndicatorTypeInfo
+                && !((TimeIndicatorTypeInfo) typeInfo).isEventTime();
+    }
+
+    @Deprecated
+    public static boolean isRowtimeIndicatorType(TypeInformation<?> typeInfo) {
+        return typeInfo instanceof TimeIndicatorTypeInfo
+                && ((TimeIndicatorTypeInfo) typeInfo).isEventTime();
+    }
+
+    @Deprecated
+    public static boolean isTimeIndicatorType(TypeInformation<?> typeInfo) {
+        return typeInfo instanceof TimeIndicatorTypeInfo;
+    }
+
+    /** Returns a projected {@link RelDataType} of the structure type. */
+    public RelDataType projectStructType(RelDataType relType, int[] 
selectedFields) {
+        final List<RelDataTypeField> fields = new ArrayList<>();
+        for (int selectedField : selectedFields) {
+            fields.add(relType.getFieldList().get(selectedField));
+        }
+        return this.createStructType(fields);
+    }
+
+    /**
+     * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory.
+     *
+     * @param tableSchema schema to convert to Calcite's specific one
+     * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
+     */
+    public RelDataType buildRelNodeRowType(TableSchema tableSchema) {
+        return buildRelNodeRowType(
+                tableSchema.getFieldNames(),
+                Arrays.stream(tableSchema.getFieldDataTypes())
+                        .map(DataType::getLogicalType)
+                        .toArray(LogicalType[]::new));
+    }
+
+    /**
+     * Creates a table row type with the input fieldNames and input fieldTypes 
using
+     * FlinkTypeFactory. Table row type is table schema for Calcite RelNode. 
See getRowType of
+     * {@link RelNode}. Use FULLY_QUALIFIED to let each field must be 
referenced explicitly.
+     */
+    public RelDataType buildRelNodeRowType(RowType rowType) {
+        final List<RowType.RowField> fields = rowType.getFields();
+        return buildStructType(
+                
fields.stream().map(RowType.RowField::getName).toArray(String[]::new),
+                
fields.stream().map(RowType.RowField::getType).toArray(LogicalType[]::new),
+                StructKind.FULLY_QUALIFIED);
+    }
+
+    /**
+     * Creates a table row type with the input fieldNames and input fieldTypes 
using
+     * FlinkTypeFactory. Table row type is table schema for Calcite RelNode. 
See getRowType of
+     * {@link RelNode}. Use FULLY_QUALIFIED to let each field must be 
referenced explicitly.
+     *
+     * @param fieldNames field names
+     * @param fieldTypes field types, every element is Flink's {@link 
LogicalType}
+     * @return a table row type with the input fieldNames, input fieldTypes.
+     */
+    public RelDataType buildRelNodeRowType(String[] fieldNames, LogicalType[] 
fieldTypes) {
+        return buildStructType(fieldNames, fieldTypes, 
StructKind.FULLY_QUALIFIED);
+    }
+
+    /**
+     * Creates a table row type with the given field names and field types. 
Table row type is table
+     * schema for Calcite {@link RelNode}. See {@link RelNode#getRowType()}.
+     *
+     * <p>It uses {@link StructKind#FULLY_QUALIFIED} to let each field must be 
referenced
+     * explicitly.
+     *
+     * @param fieldNames field names
+     * @param fieldTypes field types, every element is Flink's {@link 
LogicalType}
+     * @return a table row type with the input fieldNames, input fieldTypes.
+     */
+    public RelDataType buildRelNodeRowType(List<String> fieldNames, 
List<LogicalType> fieldTypes) {
+        return buildStructType(
+                fieldNames.toArray(new String[0]),
+                fieldTypes.toArray(new LogicalType[0]),
+                StructKind.FULLY_QUALIFIED);
+    }
+
+    /**
+     * Creates a table row type with the input fieldNames and input fieldTypes 
using
+     * FlinkTypeFactory. Table row type is table schema for Calcite RelNode. 
See getRowType of
+     * {@link RelNode}. Use FULLY_QUALIFIED to let each field must be 
referenced explicitly.
+     *
+     * @param fieldNames field names
+     * @param fieldTypes field types, every element is Flink's {@link 
LogicalType}
+     * @return a table row type with the input fieldNames, input fieldTypes.
+     */
+    public RelDataType buildRelNodeRowType(Seq<String> fieldNames, 
Seq<LogicalType> fieldTypes) {
+        return buildRelNodeRowType(
+                JavaConverters.seqAsJavaList(fieldNames), 
JavaConverters.seqAsJavaList(fieldTypes));
+    }
+
+    public static LogicalType toLogicalType(RelDataType relType) {
+        return 
toLogicalTypeWithoutNullability(relType).copy(relType.isNullable());
+    }
+
+    public static TableSchema toTableSchema(RelDataType relDataType) {
+        final String[] fieldNames = relDataType.getFieldNames().toArray(new 
String[0]);
+        final DataType[] fieldTypes =
+                relDataType.getFieldList().stream()
+                        .map(
+                                field ->
+                                        
LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
+                                                
FlinkTypeFactory.toLogicalType(field.getType())))
+                        .toArray(DataType[]::new);
+        return TableSchema.builder().fields(fieldNames, fieldTypes).build();
+    }
+
+    /**
+     * Creates a struct type with the physical columns using FlinkTypeFactory.
+     *
+     * @param tableSchema schema to convert to Calcite's specific one
+     * @return a struct type with the input fieldNames, input fieldTypes.
+     */
+    public RelDataType buildPhysicalRelNodeRowType(TableSchema tableSchema) {
+        return 
buildRelNodeRowType(TableSchemaUtils.getPhysicalSchema(tableSchema));
+    }
+
+    /**
+     * Creates a struct type with the persisted columns using FlinkTypeFactory.
+     *
+     * @param tableSchema schema to convert to Calcite's specific one
+     * @return a struct type with the input fieldsNames, input fieldTypes.
+     */
+    public RelDataType buildPersistedRelNodeRowType(TableSchema tableSchema) {
+        return 
buildRelNodeRowType(TableSchemaUtils.getPersistedSchema(tableSchema));
+    }
+
+    private static LogicalType toLogicalTypeWithoutNullability(RelDataType 
relDataType) {
+        switch (relDataType.getSqlTypeName()) {
+            case BOOLEAN:
+                return new BooleanType();
+            case TINYINT:
+                return new TinyIntType();
+            case SMALLINT:
+                return new SmallIntType();
+            case INTEGER:
+                return new IntType();
+            case BIGINT:
+                return new BigIntType();
+            case FLOAT:
+                return new FloatType();
+            case DOUBLE:
+                return new DoubleType();
+            case CHAR:
+                if (relDataType.getPrecision() == 0) {
+                    return CharType.ofEmptyLiteral();
+                } else {
+                    return new CharType(relDataType.getPrecision());
+                }
+            case VARCHAR:
+                if (relDataType.getPrecision() == 0) {
+                    return VarCharType.ofEmptyLiteral();
+                } else {
+                    return new VarCharType(relDataType.getPrecision());
+                }
+            case BINARY:
+                if (relDataType.getPrecision() == 0) {
+                    return BinaryType.ofEmptyLiteral();
+                } else {
+                    return new BinaryType(relDataType.getPrecision());
+                }
+            case VARBINARY:
+                if (relDataType.getPrecision() == 0) {
+                    return VarBinaryType.ofEmptyLiteral();
+                } else {
+                    return new VarBinaryType(relDataType.getPrecision());
+                }
+            case DECIMAL:
+                return new DecimalType(relDataType.getPrecision(), 
relDataType.getScale());
+
+            case TIMESTAMP:
+                if (relDataType instanceof TimeIndicatorRelDataType) {
+                    final TimeIndicatorRelDataType indicator =
+                            (TimeIndicatorRelDataType) relDataType;
+                    if (indicator.isEventTime()) {
+                        return new TimestampType(true, TimestampKind.ROWTIME, 
3);
+                    } else {
+                        throw new TableException(
+                                "Processing time indicator only supports"
+                                        + " LocalZonedTimestampType, but 
actual is TimestampType."
+                                        + " This is a bug in planner, please 
file an issue.");
+                    }
+                } else {
+                    return new TimestampType(relDataType.getPrecision());
+                }
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (relDataType instanceof TimeIndicatorRelDataType) {
+                    TimeIndicatorRelDataType indicator = 
(TimeIndicatorRelDataType) relDataType;
+                    if (indicator.isEventTime()) {
+                        return new LocalZonedTimestampType(true, 
TimestampKind.ROWTIME, 3);
+                    } else {
+                        return new LocalZonedTimestampType(true, 
TimestampKind.PROCTIME, 3);
+                    }
+                }
+                return new LocalZonedTimestampType(relDataType.getPrecision());
+
+            case DATE:
+                return new DateType();
+            case TIME:
+                return new TimeType(relDataType.getPrecision());
+
+            case INTERVAL_YEAR:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_MONTH:
+                return DataTypes.INTERVAL(DataTypes.MONTH()).getLogicalType();
+
+            case INTERVAL_DAY:
+            case INTERVAL_DAY_HOUR:
+            case INTERVAL_DAY_MINUTE:
+            case INTERVAL_DAY_SECOND:
+            case INTERVAL_HOUR:
+            case INTERVAL_HOUR_MINUTE:
+            case INTERVAL_HOUR_SECOND:
+            case INTERVAL_MINUTE:
+            case INTERVAL_MINUTE_SECOND:
+            case INTERVAL_SECOND:
+                if (relDataType.getPrecision() > 3) {
+                    throw new TableException(
+                            "DAY_INTERVAL_TYPES precision is not supported: "
+                                    + relDataType.getPrecision());
+                }
+                return 
DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType();
+
+            case NULL:
+                return new NullType();
+
+            case SYMBOL:
+                return new SymbolType();
+
+            case COLUMN_LIST:
+                return new DescriptorType();
+
+            // extract encapsulated Type
+            case ANY:
+                if (relDataType instanceof GenericRelDataType) {
+                    final GenericRelDataType genericRelDataType = 
(GenericRelDataType) relDataType;
+                    return genericRelDataType.genericType();
+                } else {
+                    throw new TableException("Type is not supported: " + 
relDataType);
+                }
+
+            case ROW:
+                if (relDataType instanceof RelRecordType) {
+                    return toLogicalRowType(relDataType);
+                } else {
+                    throw new TableException("Type is not supported: " + 
relDataType);
+                }
+
+            case STRUCTURED:
+                if (relDataType instanceof StructuredRelDataType) {
+                    return ((StructuredRelDataType) 
relDataType).getStructuredType();
+                } else {
+                    throw new TableException("Type is not supported: " + 
relDataType);
+                }
+
+            case MULTISET:
+                return new 
MultisetType(toLogicalType(relDataType.getComponentType()));
+
+            case ARRAY:
+                return new 
ArrayType(toLogicalType(relDataType.getComponentType()));
+
+            case MAP:
+                if (relDataType instanceof MapSqlType) {
+                    final MapSqlType mapRelDataType = (MapSqlType) relDataType;
+                    return new MapType(
+                            toLogicalType(mapRelDataType.getKeyType()),
+                            toLogicalType(mapRelDataType.getValueType()));
+                } else {
+                    throw new TableException("Type is not supported: " + 
relDataType);
+                }
+
+            // CURSOR for UDTF case, whose type info will never be used, just 
a placeholder
+            case CURSOR:
+                return new TypeInformationRawType(new NothingTypeInfo());
+
+            case VARIANT:
+                return new VariantType();
+
+            case OTHER:
+                if (relDataType instanceof RawRelDataType) {
+                    return ((RawRelDataType) relDataType).getRawType();
+                } else if (relDataType instanceof BitmapRelDataType) {
+                    return ((BitmapRelDataType) relDataType).getBitmapType();
+                } else {
+                    throw new TableException("Type is not supported: " + 
relDataType);
+                }
+
+            default:
+                throw new TableException("Type is not supported: " + 
relDataType);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalLocalRuntimeFilterBuilder.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalLocalRuntimeFilterBuilder.java
index d956f0a2990..2f8b61824b6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalLocalRuntimeFilterBuilder.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/runtimefilter/BatchPhysicalLocalRuntimeFilterBuilder.java
@@ -34,7 +34,6 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.type.RelDataType;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -81,8 +80,8 @@ public class BatchPhysicalLocalRuntimeFilterBuilder extends 
SingleRel implements
     protected RelDataType deriveRowType() {
         return ((FlinkTypeFactory) getCluster().getTypeFactory())
                 .buildRelNodeRowType(
-                        Arrays.asList("actualRowCount", "filter"),
-                        Arrays.asList(new IntType(), new VarBinaryType()));
+                        List.of("actualRowCount", "filter"),
+                        List.of(new IntType(), new VarBinaryType()));
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
deleted file mode 100644
index e8e88833294..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++ /dev/null
@@ -1,723 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.calcite
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NothingTypeInfo, 
TypeInformation}
-import org.apache.flink.table.api.{DataTypes, TableException, 
ValidationException}
-import org.apache.flink.table.calcite.ExtendedRelTypeFactory
-import org.apache.flink.table.legacy.api.TableSchema
-import org.apache.flink.table.legacy.types.logical.TypeInformationRawType
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType
-import org.apache.flink.table.planner.plan.schema._
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
-import org.apache.flink.table.runtime.types.{LogicalTypeDataTypeConverter, 
PlannerTypeUtils}
-import org.apache.flink.table.types.logical._
-import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-import org.apache.flink.table.utils.TableSchemaUtils
-import org.apache.flink.types.Nothing
-import org.apache.flink.util.Preconditions.checkArgument
-
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.`type`.{BasicSqlType, MapSqlType, SqlTypeName}
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.util.ConversionUtil
-
-import java.nio.charset.Charset
-import java.util
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * Flink specific type factory that represents the interface between Flink's 
[[LogicalType]] and
- * Calcite's [[RelDataType]].
- */
-class FlinkTypeFactory(
-    classLoader: ClassLoader,
-    typeSystem: RelDataTypeSystem = FlinkTypeSystem.INSTANCE)
-  extends JavaTypeFactoryImpl(typeSystem)
-  with ExtendedRelTypeFactory {
-
-  private val seenTypes = mutable.HashMap[LogicalType, RelDataType]()
-
-  /**
-   * Create a calcite field type in table schema from [[LogicalType]]. It use 
PEEK_FIELDS_NO_EXPAND
-   * when type is a nested struct type (Flink [[RowType]]).
-   *
-   * @param t
-   *   flink logical type.
-   * @return
-   *   calcite [[RelDataType]].
-   */
-  def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = {
-    def newRelDataType(): RelDataType = t.getTypeRoot match {
-      case LogicalTypeRoot.NULL => createSqlType(NULL)
-      case LogicalTypeRoot.BOOLEAN => createSqlType(BOOLEAN)
-      case LogicalTypeRoot.TINYINT => createSqlType(TINYINT)
-      case LogicalTypeRoot.SMALLINT => createSqlType(SMALLINT)
-      case LogicalTypeRoot.INTEGER => createSqlType(INTEGER)
-      case LogicalTypeRoot.BIGINT => createSqlType(BIGINT)
-      case LogicalTypeRoot.FLOAT => createSqlType(FLOAT)
-      case LogicalTypeRoot.DOUBLE => createSqlType(DOUBLE)
-      case LogicalTypeRoot.VARCHAR => createSqlType(VARCHAR, 
t.asInstanceOf[VarCharType].getLength)
-      case LogicalTypeRoot.CHAR => createSqlType(CHAR, 
t.asInstanceOf[CharType].getLength)
-
-      // temporal types
-      case LogicalTypeRoot.DATE => createSqlType(DATE)
-      case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE =>
-        createSqlType(TIME, t.asInstanceOf[TimeType].getPrecision)
-
-      // interval types
-      case LogicalTypeRoot.INTERVAL_YEAR_MONTH =>
-        createSqlIntervalType(
-          new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, 
SqlParserPos.ZERO))
-      case LogicalTypeRoot.INTERVAL_DAY_TIME =>
-        createSqlIntervalType(
-          new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, 
SqlParserPos.ZERO))
-
-      case LogicalTypeRoot.BINARY => createSqlType(BINARY, 
t.asInstanceOf[BinaryType].getLength)
-      case LogicalTypeRoot.VARBINARY =>
-        createSqlType(VARBINARY, t.asInstanceOf[VarBinaryType].getLength)
-
-      case LogicalTypeRoot.DECIMAL =>
-        t match {
-          case decimalType: DecimalType =>
-            createSqlType(DECIMAL, decimalType.getPrecision, 
decimalType.getScale)
-          case legacyType: LegacyTypeInformationType[_]
-              if legacyType.getTypeInformation == 
BasicTypeInfo.BIG_DEC_TYPE_INFO =>
-            createSqlType(DECIMAL, 38, 18)
-        }
-
-      case LogicalTypeRoot.ROW =>
-        val rowType = t.asInstanceOf[RowType]
-        buildStructType(
-          rowType.getFieldNames,
-          rowType.getChildren,
-          // fields are not expanded in "SELECT *"
-          StructKind.PEEK_FIELDS_NO_EXPAND)
-
-      case LogicalTypeRoot.STRUCTURED_TYPE =>
-        t match {
-          case structuredType: StructuredType => 
StructuredRelDataType.create(this, structuredType)
-          case legacyTypeInformationType: LegacyTypeInformationType[_] =>
-            createFieldTypeFromLogicalType(
-              PlannerTypeUtils.removeLegacyTypes(legacyTypeInformationType))
-        }
-
-      case LogicalTypeRoot.ARRAY =>
-        val arrayType = t.asInstanceOf[ArrayType]
-        
createArrayType(createFieldTypeFromLogicalType(arrayType.getElementType), -1)
-
-      case LogicalTypeRoot.MAP =>
-        val mapType = t.asInstanceOf[MapType]
-        createMapType(
-          createFieldTypeFromLogicalType(mapType.getKeyType),
-          createFieldTypeFromLogicalType(mapType.getValueType))
-
-      case LogicalTypeRoot.MULTISET =>
-        val multisetType = t.asInstanceOf[MultisetType]
-        
createMultisetType(createFieldTypeFromLogicalType(multisetType.getElementType), 
-1)
-
-      case LogicalTypeRoot.RAW =>
-        t match {
-          case rawType: RawType[_] =>
-            new RawRelDataType(rawType)
-          case genericType: TypeInformationRawType[_] =>
-            new GenericRelDataType(genericType, true, getTypeSystem)
-          case legacyType: LegacyTypeInformationType[_] =>
-            
createFieldTypeFromLogicalType(PlannerTypeUtils.removeLegacyTypes(legacyType))
-        }
-
-      case LogicalTypeRoot.SYMBOL =>
-        createSqlType(SqlTypeName.SYMBOL)
-
-      case LogicalTypeRoot.DESCRIPTOR =>
-        createSqlType(SqlTypeName.COLUMN_LIST)
-
-      case LogicalTypeRoot.VARIANT =>
-        createSqlType(SqlTypeName.VARIANT)
-
-      case LogicalTypeRoot.BITMAP =>
-        new BitmapRelDataType(t.asInstanceOf[BitmapType])
-
-      case _ @t =>
-        throw new TableException(s"Type is not supported: $t")
-    }
-
-    // Kind in TimestampType do not affect the hashcode and equals, So we 
can't put it to seenTypes
-    val relType = t.getTypeRoot match {
-      case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
-        val timestampType = t.asInstanceOf[TimestampType]
-        timestampType.getKind match {
-          case TimestampKind.ROWTIME => 
createRowtimeIndicatorType(t.isNullable, false)
-          case TimestampKind.REGULAR => createSqlType(TIMESTAMP, 
timestampType.getPrecision)
-          case TimestampKind.PROCTIME =>
-            throw new TableException(
-              s"Processing time indicator only supports" +
-                s" LocalZonedTimestampType, but actual is TimestampType." +
-                s" This is a bug in planner, please file an issue.")
-        }
-      case LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
-        val lzTs = t.asInstanceOf[LocalZonedTimestampType]
-        lzTs.getKind match {
-          case TimestampKind.PROCTIME => 
createProctimeIndicatorType(t.isNullable)
-          case TimestampKind.ROWTIME => 
createRowtimeIndicatorType(t.isNullable, true)
-          case TimestampKind.REGULAR =>
-            createSqlType(TIMESTAMP_WITH_LOCAL_TIME_ZONE, lzTs.getPrecision)
-        }
-      case _ =>
-        seenTypes.get(t) match {
-          case Some(retType: RelDataType) => retType
-          case None =>
-            val refType = newRelDataType()
-            seenTypes.put(t, refType)
-            refType
-        }
-    }
-
-    createTypeWithNullability(relType, t.isNullable)
-  }
-
-  /** Creates a indicator type for processing-time, but with similar 
properties as SQL timestamp. */
-  def createProctimeIndicatorType(isNullable: Boolean): RelDataType = {
-    val originalType = createFieldTypeFromLogicalType(new 
LocalZonedTimestampType(isNullable, 3))
-    canonize(
-      new TimeIndicatorRelDataType(
-        getTypeSystem,
-        originalType.asInstanceOf[BasicSqlType],
-        isNullable,
-        isEventTime = false))
-  }
-
-  /** Creates a indicator type for event-time, but with similar properties as 
SQL timestamp. */
-  def createRowtimeIndicatorType(isNullable: Boolean, isTimestampLtz: 
Boolean): RelDataType = {
-    val originalType = if (isTimestampLtz) {
-      createFieldTypeFromLogicalType(new LocalZonedTimestampType(isNullable, 
3))
-    } else {
-      createFieldTypeFromLogicalType(new TimestampType(isNullable, 3))
-    }
-
-    canonize(
-      new TimeIndicatorRelDataType(
-        getTypeSystem,
-        originalType.asInstanceOf[BasicSqlType],
-        isNullable,
-        isEventTime = true))
-  }
-
-  /**
-   * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory
-   *
-   * @param tableSchema
-   *   schema to convert to Calcite's specific one
-   * @return
-   *   a struct type with the input fieldNames, input fieldTypes, and system 
fields
-   */
-  def buildRelNodeRowType(tableSchema: TableSchema): RelDataType = {
-    buildRelNodeRowType(
-      tableSchema.getFieldNames,
-      tableSchema.getFieldDataTypes.map(_.getLogicalType))
-  }
-
-  /**
-   * Creates a table row type with the given field names and field types. 
Table row type is table
-   * schema for Calcite [[RelNode]]. See [[RelNode#getRowType]].
-   *
-   * It uses [[StructKind#FULLY_QUALIFIED]] to let each field must be 
referenced explicitly.
-   *
-   * @param fieldNames
-   *   field names
-   * @param fieldTypes
-   *   field types, every element is Flink's [[LogicalType]]
-   * @return
-   *   a table row type with the input fieldNames, input fieldTypes.
-   */
-  def buildRelNodeRowType(
-      fieldNames: util.List[String],
-      fieldTypes: util.List[LogicalType]): RelDataType = {
-    buildStructType(fieldNames, fieldTypes, StructKind.FULLY_QUALIFIED)
-  }
-
-  /**
-   * Creates a table row type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory.
-   * Table row type is table schema for Calcite RelNode. See getRowType of 
[[RelNode]]. Use
-   * FULLY_QUALIFIED to let each field must be referenced explicitly.
-   *
-   * @param fieldNames
-   *   field names
-   * @param fieldTypes
-   *   field types, every element is Flink's [[LogicalType]]
-   * @return
-   *   a table row type with the input fieldNames, input fieldTypes.
-   */
-  def buildRelNodeRowType(fieldNames: Seq[String], fieldTypes: 
Seq[LogicalType]): RelDataType = {
-    buildStructType(fieldNames, fieldTypes, StructKind.FULLY_QUALIFIED)
-  }
-
-  /**
-   * Creates a table row type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory.
-   * Table row type is table schema for Calcite RelNode. See getRowType of 
[[RelNode]]. Use
-   * FULLY_QUALIFIED to let each field must be referenced explicitly.
-   */
-  def buildRelNodeRowType(rowType: RowType): RelDataType = {
-    val fields = rowType.getFields
-    buildStructType(fields.map(_.getName), fields.map(_.getType), 
StructKind.FULLY_QUALIFIED)
-  }
-
-  /**
-   * Creates a struct type with the physical columns using FlinkTypeFactory
-   *
-   * @param tableSchema
-   *   schema to convert to Calcite's specific one
-   * @return
-   *   a struct type with the input fieldNames, input fieldTypes.
-   */
-  def buildPhysicalRelNodeRowType(tableSchema: TableSchema): RelDataType = {
-    buildRelNodeRowType(TableSchemaUtils.getPhysicalSchema(tableSchema))
-  }
-
-  /**
-   * Creats a struct type with the persisted columns using FlinkTypeFactory
-   *
-   * @param tableSchema
-   *   schema to convert to Calcite's specific one
-   * @return
-   *   a struct type with the input fieldsNames, input fieldTypes.
-   */
-  def buildPersistedRelNodeRowType(tableSchema: TableSchema): RelDataType = {
-    buildRelNodeRowType(TableSchemaUtils.getPersistedSchema(tableSchema))
-  }
-
-  /**
-   * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory.
-   *
-   * @param fieldNames
-   *   field names
-   * @param fieldTypes
-   *   field types, every element is Flink's [[LogicalType]].
-   * @param structKind
-   *   Name resolution policy. See more information in [[StructKind]].
-   * @return
-   *   a struct type with the input fieldNames, input fieldTypes.
-   */
-  private def buildStructType(
-      fieldNames: Seq[String],
-      fieldTypes: Seq[LogicalType],
-      structKind: StructKind): RelDataType = {
-    val b = builder
-    b.kind(structKind)
-    val fields = fieldNames.zip(fieldTypes)
-    fields.foreach {
-      case (fieldName, fieldType) =>
-        val fieldRelDataType = createFieldTypeFromLogicalType(fieldType)
-        checkForNullType(fieldRelDataType)
-        b.add(fieldName, fieldRelDataType)
-    }
-    b.build
-  }
-
-  /** Returns a projected [[RelDataType]] of the structure type. */
-  def projectStructType(relType: RelDataType, selectedFields: Array[Int]): 
RelDataType = {
-    this.createStructType(
-      selectedFields
-        .map(idx => relType.getFieldList.get(idx))
-        .toList
-        .asJava)
-  }
-
-  // 
----------------------------------------------------------------------------------------------
-
-  override def getJavaClass(`type`: RelDataType): java.lang.reflect.Type = {
-    if (`type`.getSqlTypeName == FLOAT) {
-      if (`type`.isNullable) {
-        classOf[java.lang.Float]
-      } else {
-        java.lang.Float.TYPE
-      }
-    } else {
-      super.getJavaClass(`type`)
-    }
-  }
-
-  override def createSqlType(typeName: SqlTypeName, precision: Int): 
RelDataType = {
-    // it might happen that inferred VARCHAR types overflow as we set them to 
Int.MaxValue
-    // Calcite will limit the length of the VARCHAR type to 65536.
-    if (typeName == VARCHAR && precision < 0) {
-      createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
-    } else {
-      super.createSqlType(typeName, precision)
-    }
-  }
-
-  override def createArrayType(elementType: RelDataType, maxCardinality: 
Long): RelDataType = {
-    // Just validate type, make sure there is a failure in validate phase.
-    checkForNullType(elementType)
-    toLogicalType(elementType)
-    super.createArrayType(elementType, maxCardinality)
-  }
-
-  override def createMapType(keyType: RelDataType, valueType: RelDataType): 
RelDataType = {
-    // Just validate type, make sure there is a failure in validate phase.
-    checkForNullType(keyType, valueType)
-    toLogicalType(keyType)
-    toLogicalType(valueType)
-    super.createMapType(keyType, valueType)
-  }
-
-  override def createMultisetType(elementType: RelDataType, maxCardinality: 
Long): RelDataType = {
-    // Just validate type, make sure there is a failure in validate phase.
-    checkForNullType(elementType)
-    toLogicalType(elementType)
-    super.createMultisetType(elementType, maxCardinality)
-  }
-
-  override def createRawType(className: String, serializerString: String): 
RelDataType = {
-    val rawType = RawType.restore(classLoader, className, serializerString)
-    val rawRelDataType = createFieldTypeFromLogicalType(rawType)
-    canonize(rawRelDataType)
-  }
-
-  override def createStructuredType(
-      className: String,
-      fieldTypes: util.List[RelDataType],
-      fieldNames: util.List[String]): RelDataType = {
-    val resolvedClass = toScala(StructuredType.resolveClass(classLoader, 
className))
-    val builder = resolvedClass
-      .map(StructuredType.newBuilder)
-      .getOrElse(StructuredType.newBuilder(className))
-
-    val relFields = 0
-      .until(fieldTypes.size())
-      .map(i => new RelDataTypeFieldImpl(fieldNames.get(i), i, 
fieldTypes.get(i)))
-      .map(_.asInstanceOf[RelDataTypeField])
-      .toList
-
-    val attributes =
-      relFields.map(f => new StructuredAttribute(f.getName, 
toLogicalType(f.getType)))
-    builder.attributes(attributes.asJava)
-
-    val relDataType = new StructuredRelDataType(builder.build(), 
relFields.asJava)
-    canonize(relDataType)
-  }
-
-  override def createBitmapType(): RelDataType = {
-    canonize(new BitmapRelDataType(new BitmapType()))
-  }
-
-  override def createSqlType(typeName: SqlTypeName): RelDataType = {
-    if (typeName == DECIMAL) {
-      // if we got here, the precision and scale are not specified, here we
-      // keep precision/scale in sync with our type system's default value,
-      // see DecimalType.USER_DEFAULT.
-      createSqlType(typeName, DecimalType.DEFAULT_PRECISION, 
DecimalType.DEFAULT_SCALE)
-    } else {
-      super.createSqlType(typeName)
-    }
-  }
-
-  override def createTypeWithNullability(
-      relDataType: RelDataType,
-      isNullable: Boolean): RelDataType = {
-
-    // nullability change not necessary
-    if (relDataType.isNullable == isNullable) {
-      return canonize(relDataType)
-    }
-
-    // change nullability
-    val newType = relDataType match {
-      case raw: RawRelDataType =>
-        raw.createWithNullability(isNullable)
-
-      case structured: StructuredRelDataType =>
-        structured.createWithNullability(isNullable)
-
-      case bitmap: BitmapRelDataType =>
-        bitmap.createWithNullability(isNullable)
-
-      case generic: GenericRelDataType =>
-        new GenericRelDataType(generic.genericType, isNullable, typeSystem)
-
-      case it: TimeIndicatorRelDataType =>
-        new TimeIndicatorRelDataType(
-          it.typeSystemField,
-          it.originalType,
-          isNullable,
-          it.isEventTime)
-
-      // for nested rows we keep the nullability property,
-      // top-level rows fall back to Calcite's default handling
-      case rt: RelRecordType if rt.getStructKind == 
StructKind.PEEK_FIELDS_NO_EXPAND =>
-        new RelRecordType(rt.getStructKind, rt.getFieldList, isNullable);
-
-      case _ =>
-        super.createTypeWithNullability(relDataType, isNullable)
-    }
-
-    canonize(newType)
-  }
-
-  override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
-    val leastRestrictive = resolveAllIdenticalTypes(types)
-      .getOrElse(super.leastRestrictive(types))
-    // NULL is reserved for untyped literals only
-    if (leastRestrictive == null || leastRestrictive.getSqlTypeName == NULL) {
-      null
-    } else {
-      leastRestrictive
-    }
-  }
-
-  private def resolveAllIdenticalTypes(types: util.List[RelDataType]): 
Option[RelDataType] = {
-    val allTypes = types.asScala
-
-    val head = allTypes.head
-    // check if all types are the same
-    if (allTypes.forall(_ == head)) {
-      // types are the same, check nullability
-      val nullable = allTypes
-        .exists(sqlType => sqlType.isNullable || sqlType.getSqlTypeName == 
SqlTypeName.NULL)
-      // return type with nullability
-      Some(createTypeWithNullability(head, nullable))
-    } else {
-      // types are not all the same
-      if (allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)) {
-        // one of the type was RAW.
-        // we cannot generate a common type if it differs from other types.
-        throw new TableException("Generic RAW types must have a common type 
information.")
-      } else {
-        // cannot resolve a common type for different input types
-        None
-      }
-    }
-  }
-
-  override def getDefaultCharset: Charset = {
-    Charset.forName(ConversionUtil.NATIVE_UTF16_CHARSET_NAME)
-  }
-
-  /**
-   * This is a safety check in case the null type ends up in the type factory 
for other use cases
-   * than untyped NULL literals.
-   */
-  private def checkForNullType(childTypes: RelDataType*): Unit = {
-    childTypes.foreach {
-      t =>
-        if (t.getSqlTypeName == NULL) {
-          throw new ValidationException(
-            "The null type is reserved for representing untyped NULL literals. 
It should not be " +
-              "used in constructed types. Please cast NULL literals to a more 
explicit type.")
-        }
-    }
-  }
-}
-
-object FlinkTypeFactory {
-
-  def isTimeIndicatorType(t: LogicalType): Boolean = t match {
-    case t: TimestampType if t.getKind == TimestampKind.ROWTIME => true
-    case ltz: LocalZonedTimestampType if ltz.getKind == TimestampKind.PROCTIME 
=> true
-    case _ => false
-  }
-
-  def isTimeIndicatorType(relDataType: RelDataType): Boolean = relDataType 
match {
-    case _: TimeIndicatorRelDataType => true
-    case _ => false
-  }
-
-  def isRowtimeIndicatorType(relDataType: RelDataType): Boolean = relDataType 
match {
-    case ti: TimeIndicatorRelDataType if ti.isEventTime => true
-    case _ => false
-  }
-
-  def isProctimeIndicatorType(relDataType: RelDataType): Boolean = relDataType 
match {
-    case ti: TimeIndicatorRelDataType if !ti.isEventTime => true
-    case _ => false
-  }
-
-  def isTimestampLtzIndicatorType(relDataType: RelDataType): Boolean =
-    relDataType match {
-      case ti: TimeIndicatorRelDataType
-          if 
ti.originalType.getSqlTypeName.equals(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
 =>
-        true
-      case _ => false
-    }
-
-  @Deprecated
-  def isProctimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = 
typeInfo match {
-    case ti: TimeIndicatorTypeInfo if !ti.isEventTime => true
-    case _ => false
-  }
-
-  @Deprecated
-  def isRowtimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo 
match {
-    case ti: TimeIndicatorTypeInfo if ti.isEventTime => true
-    case _ => false
-  }
-
-  @Deprecated
-  def isTimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo 
match {
-    case ti: TimeIndicatorTypeInfo => true
-    case _ => false
-  }
-
-  def toLogicalType(relDataType: RelDataType): LogicalType = {
-    val logicalType = relDataType.getSqlTypeName match {
-      case BOOLEAN => new BooleanType()
-      case TINYINT => new TinyIntType()
-      case SMALLINT => new SmallIntType()
-      case INTEGER => new IntType()
-      case BIGINT => new BigIntType()
-      case FLOAT => new FloatType()
-      case DOUBLE => new DoubleType()
-      case CHAR =>
-        if (relDataType.getPrecision == 0) {
-          CharType.ofEmptyLiteral
-        } else {
-          new CharType(relDataType.getPrecision)
-        }
-      case VARCHAR =>
-        if (relDataType.getPrecision == 0) {
-          VarCharType.ofEmptyLiteral
-        } else {
-          new VarCharType(relDataType.getPrecision)
-        }
-      case BINARY =>
-        if (relDataType.getPrecision == 0) {
-          BinaryType.ofEmptyLiteral
-        } else {
-          new BinaryType(relDataType.getPrecision)
-        }
-      case VARBINARY =>
-        if (relDataType.getPrecision == 0) {
-          VarBinaryType.ofEmptyLiteral
-        } else {
-          new VarBinaryType(relDataType.getPrecision)
-        }
-      case DECIMAL => new DecimalType(relDataType.getPrecision, 
relDataType.getScale)
-
-      // time indicators
-      case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-        val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-        if (indicator.isEventTime) {
-          new TimestampType(true, TimestampKind.ROWTIME, 3)
-        } else {
-          throw new TableException(
-            s"Processing time indicator only supports" +
-              s" LocalZonedTimestampType, but actual is TimestampType." +
-              s" This is a bug in planner, please file an issue.")
-        }
-      case TIMESTAMP_WITH_LOCAL_TIME_ZONE if 
relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
-        val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
-        if (indicator.isEventTime) {
-          new LocalZonedTimestampType(true, TimestampKind.ROWTIME, 3)
-        } else {
-          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3)
-        }
-
-      // temporal types
-      case DATE => new DateType()
-      case TIME =>
-        new TimeType(relDataType.getPrecision)
-      case TIMESTAMP =>
-        new TimestampType(relDataType.getPrecision)
-      case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
-        new LocalZonedTimestampType(relDataType.getPrecision)
-      case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
-        DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType
-      case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
-        if (relDataType.getPrecision > 3) {
-          throw new TableException(
-            s"DAY_INTERVAL_TYPES precision is not supported: 
${relDataType.getPrecision}")
-        }
-        DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType
-
-      case NULL =>
-        new NullType()
-
-      case SYMBOL =>
-        new SymbolType()
-
-      case COLUMN_LIST =>
-        new DescriptorType()
-
-      // extract encapsulated Type
-      case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-        val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-        genericRelDataType.genericType
-
-      case ROW if relDataType.isInstanceOf[RelRecordType] =>
-        toLogicalRowType(relDataType)
-
-      case STRUCTURED if relDataType.isInstanceOf[StructuredRelDataType] =>
-        relDataType.asInstanceOf[StructuredRelDataType].getStructuredType
-
-      case MULTISET => new 
MultisetType(toLogicalType(relDataType.getComponentType))
-
-      case ARRAY => new ArrayType(toLogicalType(relDataType.getComponentType))
-
-      case MAP if relDataType.isInstanceOf[MapSqlType] =>
-        val mapRelDataType = relDataType.asInstanceOf[MapSqlType]
-        new MapType(
-          toLogicalType(mapRelDataType.getKeyType),
-          toLogicalType(mapRelDataType.getValueType))
-
-      // CURSOR for UDTF case, whose type info will never be used, just a 
placeholder
-      case CURSOR => new TypeInformationRawType[Nothing](new NothingTypeInfo)
-
-      case VARIANT => new VariantType()
-
-      case OTHER if relDataType.isInstanceOf[RawRelDataType] =>
-        relDataType.asInstanceOf[RawRelDataType].getRawType
-
-      case OTHER if relDataType.isInstanceOf[BitmapRelDataType] =>
-        relDataType.asInstanceOf[BitmapRelDataType].getBitmapType
-
-      case _ @t =>
-        throw new TableException(s"Type is not supported: $t")
-    }
-    logicalType.copy(relDataType.isNullable)
-  }
-
-  def toTableSchema(relDataType: RelDataType): TableSchema = {
-    val fieldNames = relDataType.getFieldNames.toArray(new Array[String](0))
-    val fieldTypes = relDataType.getFieldList.asScala
-      .map(
-        field =>
-          LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
-            FlinkTypeFactory.toLogicalType(field.getType)))
-      .toArray
-    TableSchema.builder.fields(fieldNames, fieldTypes).build
-  }
-
-  def toLogicalRowType(relType: RelDataType): RowType = {
-    checkArgument(relType.isStruct)
-    RowType.of(
-      relType.getFieldList.asScala
-        .map(fieldType => toLogicalType(fieldType.getType))
-        .toArray[LogicalType],
-      relType.getFieldNames.asScala.toArray)
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
index fb26a6f9af5..867834b32e5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.functions.AsyncScalarFunction;
 import org.apache.flink.table.planner.calcite.SqlToRexConverter;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.PlannerMocks;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.runtime.generated.GeneratedFunction;
@@ -76,23 +75,15 @@ public class AsyncCodeGeneratorTest {
                         .getPlannerContext()
                         .getTypeFactory()
                         .buildRelNodeRowType(
-                                
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
-                                JavaScalaConversionUtil.toScala(
-                                        Arrays.asList(
-                                                new IntType(),
-                                                new BigIntType(),
-                                                new VarCharType())));
+                                List.of("f1", "f2", "f3"),
+                                List.of(new IntType(), new BigIntType(), new 
VarCharType()));
         tableRowType2 =
                 plannerMocks
                         .getPlannerContext()
                         .getTypeFactory()
                         .buildRelNodeRowType(
-                                
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
-                                JavaScalaConversionUtil.toScala(
-                                        Arrays.asList(
-                                                new VarCharType(),
-                                                new VarCharType(),
-                                                new VarCharType())));
+                                List.of("f1", "f2", "f3"),
+                                List.of(new VarCharType(), new VarCharType(), 
new VarCharType()));
         
ShortcutUtils.unwrapContext(plannerMocks.getPlanner().createToRelContext().getCluster());
         converter =
                 ShortcutUtils.unwrapContext(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java
index 18daf6819ab..db5f7e6ec07 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.functions.AsyncTableFunction;
 import org.apache.flink.table.planner.calcite.SqlToRexConverter;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.PlannerMocks;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.runtime.generated.GeneratedFunction;
@@ -73,12 +72,8 @@ public class AsyncCorrelateCodeGeneratorTest {
                         .getPlannerContext()
                         .getTypeFactory()
                         .buildRelNodeRowType(
-                                
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
-                                JavaScalaConversionUtil.toScala(
-                                        Arrays.asList(
-                                                new IntType(),
-                                                new BigIntType(),
-                                                new VarCharType())));
+                                List.of("f1", "f2", "f3"),
+                                List.of(new IntType(), new BigIntType(), new 
VarCharType()));
         
ShortcutUtils.unwrapContext(plannerMocks.getPlanner().createToRelContext().getCluster());
         plannerMocks
                 .getFunctionCatalog()
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/AsyncUtilTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/AsyncUtilTest.java
index c18330d73d8..f099952a58d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/AsyncUtilTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/AsyncUtilTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.planner.calcite.SqlToRexConverter;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.PlannerMocks;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -36,8 +35,8 @@ import org.apache.calcite.rex.RexNode;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.table.planner.plan.utils.AsyncUtil.containsAsyncCall;
@@ -61,12 +60,8 @@ public class AsyncUtilTest {
                         .getPlannerContext()
                         .getTypeFactory()
                         .buildRelNodeRowType(
-                                
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
-                                JavaScalaConversionUtil.toScala(
-                                        Arrays.asList(
-                                                new IntType(),
-                                                new BigIntType(),
-                                                new VarCharType())));
+                                List.of("f1", "f2", "f3"),
+                                List.of(new IntType(), new BigIntType(), new 
VarCharType()));
 
         converter =
                 ShortcutUtils.unwrapContext(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
index eabc29b01ee..ed7f02d040c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -132,7 +132,7 @@ class AggCallSelectivityEstimatorTest {
   private def createInputRef(index: Int): RexInputRef = {
     val relDataType = typeFactory.createSqlType(allFieldTypes(index))
     val relDataTypeWithNullability =
-      typeFactory.createTypeWithNullability(relDataType, isNullable = false)
+      typeFactory.createTypeWithNullability(relDataType, false)
     rexBuilder.makeInputRef(relDataTypeWithNullability, index)
   }
 

Reply via email to