matriv commented on a change in pull request #18342:
URL: https://github.com/apache/flink/pull/18342#discussion_r783771529



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java
##########
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.RawRelDataType;
+import org.apache.flink.table.planner.plan.schema.StructuredRelDataType;
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import 
org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import 
org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Symmetric converter between {@link LogicalType} and {@link RelDataType}.
+ *
+ * <p>This converter has many similarities with {@link FlinkTypeFactory} and 
might also replace it
+ * at some point. However, for now it is more consistent and does not lose 
information (i.e. for
+ * TIME(9) or interval types). It still delegates to {@link 
RelDataTypeFactory} but only for
+ * predefined/basic types.
+ *
+ * <p>Note: The conversion to {@link LogicalType} is not 100% symmetric and is 
currently optimized
+ * for expressions. Information about the {@link StructKind} of a {@link 
RelRecordType} is always
+ * set to {@link StructKind#PEEK_FIELDS_NO_EXPAND}. Missing precision and 
scale will be filled with
+ * Flink's default values such that all resulting {@link LogicalType}s will be 
fully resolved.
+ */
+@Internal
+public final class LogicalRelDataTypeConverter {
+
+    public static RelDataType toRelDataType(
+            LogicalType logicalType, RelDataTypeFactory relDataTypeFactory) {
+        final LogicalToRelDataTypeConverter converter =
+                new LogicalToRelDataTypeConverter(relDataTypeFactory);
+        final RelDataType relDataType = logicalType.accept(converter);
+        // this also canonizes in the factory (see SqlTypeFactoryImpl.canonize)
+        return relDataTypeFactory.createTypeWithNullability(relDataType, 
logicalType.isNullable());
+    }
+
+    public static LogicalType toLogicalType(
+            RelDataType relDataType, DataTypeFactory dataTypeFactory) {
+        final LogicalType logicalType = toLogicalTypeNotNull(relDataType, 
dataTypeFactory);
+        return logicalType.copy(relDataType.isNullable());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // LogicalType to RelDataType
+    // 
--------------------------------------------------------------------------------------------
+
+    private static class LogicalToRelDataTypeConverter implements 
LogicalTypeVisitor<RelDataType> {
+
+        private final RelDataTypeFactory relDataTypeFactory;
+
+        LogicalToRelDataTypeConverter(RelDataTypeFactory relDataTypeFactory) {
+            this.relDataTypeFactory = relDataTypeFactory;
+        }
+
+        @Override
+        public RelDataType visit(CharType charType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.CHAR, 
charType.getLength());
+        }
+
+        @Override
+        public RelDataType visit(VarCharType varCharType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR, 
varCharType.getLength());
+        }
+
+        @Override
+        public RelDataType visit(BooleanType booleanType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
+        }
+
+        @Override
+        public RelDataType visit(BinaryType binaryType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.BINARY, 
binaryType.getLength());
+        }
+
+        @Override
+        public RelDataType visit(VarBinaryType varBinaryType) {
+            return relDataTypeFactory.createSqlType(
+                    SqlTypeName.VARBINARY, varBinaryType.getLength());
+        }
+
+        @Override
+        public RelDataType visit(DecimalType decimalType) {
+            return relDataTypeFactory.createSqlType(
+                    SqlTypeName.DECIMAL, decimalType.getPrecision(), 
decimalType.getScale());
+        }
+
+        @Override
+        public RelDataType visit(TinyIntType tinyIntType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.TINYINT);
+        }
+
+        @Override
+        public RelDataType visit(SmallIntType smallIntType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.SMALLINT);
+        }
+
+        @Override
+        public RelDataType visit(IntType intType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER);
+        }
+
+        @Override
+        public RelDataType visit(BigIntType bigIntType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT);
+        }
+
+        @Override
+        public RelDataType visit(FloatType floatType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT);
+        }
+
+        @Override
+        public RelDataType visit(DoubleType doubleType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+        }
+
+        @Override
+        public RelDataType visit(DateType dateType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.DATE);
+        }
+
+        @Override
+        public RelDataType visit(TimeType timeType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.TIME, 
timeType.getPrecision());
+        }
+
+        @Override
+        public RelDataType visit(TimestampType timestampType) {
+            final RelDataType timestampRelDataType =
+                    relDataTypeFactory.createSqlType(
+                            SqlTypeName.TIMESTAMP, 
timestampType.getPrecision());
+            switch (timestampType.getKind()) {
+                case REGULAR:
+                    return timestampRelDataType;
+                case ROWTIME:
+                    assert timestampType.getPrecision() == 3;
+                    return new TimeIndicatorRelDataType(
+                            relDataTypeFactory.getTypeSystem(),
+                            (BasicSqlType) timestampRelDataType,
+                            timestampType.isNullable(),
+                            true);
+                default:
+                    throw new TableException("Unknown timestamp kind.");
+            }
+        }
+
+        @Override
+        public RelDataType visit(ZonedTimestampType zonedTimestampType) {
+            throw new TableException("TIMESTAMP WITH TIME ZONE is currently 
not supported.");
+        }
+
+        @Override
+        public RelDataType visit(LocalZonedTimestampType 
localZonedTimestampType) {
+            final RelDataType timestampRelDataType =
+                    relDataTypeFactory.createSqlType(
+                            SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                            localZonedTimestampType.getPrecision());
+            switch (localZonedTimestampType.getKind()) {
+                case REGULAR:
+                    return timestampRelDataType;
+                case ROWTIME:
+                    assert localZonedTimestampType.getPrecision() == 3;
+                    return new TimeIndicatorRelDataType(
+                            relDataTypeFactory.getTypeSystem(),
+                            (BasicSqlType) timestampRelDataType,
+                            localZonedTimestampType.isNullable(),
+                            true);
+                case PROCTIME:
+                    assert localZonedTimestampType.getPrecision() == 3;
+                    return new TimeIndicatorRelDataType(
+                            relDataTypeFactory.getTypeSystem(),
+                            (BasicSqlType) timestampRelDataType,
+                            localZonedTimestampType.isNullable(),
+                            false);
+                default:
+                    throw new TableException("Unknown timestamp kind.");
+            }
+        }
+
+        @Override
+        public RelDataType visit(YearMonthIntervalType yearMonthIntervalType) {
+            final int yearPrecision = yearMonthIntervalType.getYearPrecision();
+            final SqlIntervalQualifier intervalQualifier;
+            switch (yearMonthIntervalType.getResolution()) {
+                case YEAR:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.YEAR,
+                                    yearPrecision,
+                                    TimeUnit.YEAR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case YEAR_TO_MONTH:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.YEAR,
+                                    yearPrecision,
+                                    TimeUnit.MONTH,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case MONTH:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.MONTH,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.MONTH,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown interval 
resolution.");
+            }
+            return relDataTypeFactory.createSqlIntervalType(intervalQualifier);
+        }
+
+        @Override
+        public RelDataType visit(DayTimeIntervalType dayTimeIntervalType) {
+            final int dayPrecision = dayTimeIntervalType.getDayPrecision();
+            final int fractionalPrecision = 
dayTimeIntervalType.getFractionalPrecision();
+            final SqlIntervalQualifier intervalQualifier;
+            switch (dayTimeIntervalType.getResolution()) {
+                case DAY:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.DAY,
+                                    dayPrecision,
+                                    TimeUnit.DAY,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case DAY_TO_HOUR:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.DAY,
+                                    dayPrecision,
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case DAY_TO_MINUTE:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.DAY,
+                                    dayPrecision,
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case DAY_TO_SECOND:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.DAY,
+                                    dayPrecision,
+                                    TimeUnit.SECOND,
+                                    fractionalPrecision,
+                                    SqlParserPos.ZERO);
+                    break;
+                case HOUR:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case HOUR_TO_MINUTE:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case HOUR_TO_SECOND:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.SECOND,
+                                    fractionalPrecision,
+                                    SqlParserPos.ZERO);
+                    break;
+                case MINUTE:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case MINUTE_TO_SECOND:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.SECOND,
+                                    fractionalPrecision,
+                                    SqlParserPos.ZERO);
+                    break;
+                case SECOND:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.SECOND,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.SECOND,
+                                    fractionalPrecision,
+                                    SqlParserPos.ZERO);
+                    break;
+                default:
+                    throw new TableException("Unknown interval resolution.");
+            }
+            return relDataTypeFactory.createSqlIntervalType(intervalQualifier);
+        }
+
+        @Override
+        public RelDataType visit(ArrayType arrayType) {
+            final RelDataType elementRelDataType =
+                    toRelDataType(arrayType.getElementType(), 
relDataTypeFactory);
+            return relDataTypeFactory.createArrayType(elementRelDataType, -1);
+        }
+
+        @Override
+        public RelDataType visit(MultisetType multisetType) {
+            final RelDataType elementRelDataType =
+                    toRelDataType(multisetType.getElementType(), 
relDataTypeFactory);
+            return relDataTypeFactory.createMultisetType(elementRelDataType, 
-1);
+        }
+
+        @Override
+        public RelDataType visit(MapType mapType) {
+            final RelDataType keyRelDataType =
+                    toRelDataType(mapType.getKeyType(), relDataTypeFactory);
+            final RelDataType valueRelDataType =
+                    toRelDataType(mapType.getValueType(), relDataTypeFactory);
+            return relDataTypeFactory.createMapType(keyRelDataType, 
valueRelDataType);
+        }
+
+        @Override
+        public RelDataType visit(RowType rowType) {
+            return relDataTypeFactory.createStructType(
+                    StructKind.PEEK_FIELDS_NO_EXPAND,
+                    rowType.getFields().stream()
+                            .map(f -> toRelDataType(f.getType(), 
relDataTypeFactory))
+                            .collect(Collectors.toList()),
+                    rowType.getFieldNames());
+        }
+
+        @Override
+        public RelDataType visit(DistinctType distinctType) {
+            throw new TableException("DISTINCT type is currently not 
supported.");
+        }
+
+        @Override
+        public RelDataType visit(StructuredType structuredType) {
+            final List<RelDataTypeField> fields = new ArrayList<>();
+            for (int i = 0; i < structuredType.getAttributes().size(); i++) {
+                final StructuredType.StructuredAttribute attribute =
+                        structuredType.getAttributes().get(i);
+                final RelDataTypeField field =
+                        new RelDataTypeFieldImpl(
+                                attribute.getName(),
+                                i,
+                                toRelDataType(attribute.getType(), 
relDataTypeFactory));
+                fields.add(field);
+            }
+            return new StructuredRelDataType(structuredType, fields);
+        }
+
+        @Override
+        public RelDataType visit(NullType nullType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.NULL);
+        }
+
+        @Override
+        public RelDataType visit(RawType<?> rawType) {
+            return new RawRelDataType(rawType);
+        }
+
+        @Override
+        public RelDataType visit(SymbolType<?> symbolType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.SYMBOL);
+        }
+
+        @Override
+        public RelDataType visit(LogicalType other) {
+            throw new TableException(
+                    String.format(
+                            "Logical type '%s' cannot be converted to a 
RelDataType.", other));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // RelDataType to LogicalType
+    // 
--------------------------------------------------------------------------------------------
+
+    private static LogicalType toLogicalTypeNotNull(
+            RelDataType relDataType, DataTypeFactory dataTypeFactory) {
+        // dataTypeFactory is a preparation for catalog user-defined types
+        switch (relDataType.getSqlTypeName()) {
+            case BOOLEAN:
+                return new BooleanType(false);
+            case TINYINT:
+                return new TinyIntType(false);
+            case SMALLINT:
+                return new SmallIntType(false);
+            case INTEGER:
+                return new IntType(false);
+            case BIGINT:
+                return new BigIntType(false);
+            case DECIMAL:
+                if (relDataType.getScale() < 0) {

Review comment:
       What's the case that this code addresses? Forgive me if I didn't see, is 
it also tested?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.planner.calcite.FlinkContextImpl;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+
+import java.io.IOException;
+
+/** Mocks and utilities for serde tests. */
+class JsonSerdeMocks {
+
+    static SerdeContext configuredSerdeContext() {
+        return configuredSerdeContext(
+                CatalogManagerMocks.createEmptyCatalogManager(), 
TableConfig.getDefault());
+    }
+
+    static SerdeContext configuredSerdeContext(
+            CatalogManager catalogManager, TableConfig tableConfig) {
+        return new SerdeContext(
+                new FlinkContextImpl(
+                        false, tableConfig, new ModuleManager(), null, 
catalogManager, null),
+                Thread.currentThread().getContextClassLoader(),
+                FlinkTypeFactory.INSTANCE(),
+                FlinkSqlOperatorTable.instance());
+    }
+
+    static String toJson(SerdeContext serdeContext, Object object) {
+        final ObjectWriter objectWriter = 
JsonSerdeUtil.createObjectWriter(serdeContext);
+        final String json;
+        try {
+            json = objectWriter.writeValueAsString(object);
+        } catch (JsonProcessingException e) {
+            throw new AssertionError(e);
+        }
+        return json;
+    }
+
+    static <T> T toObject(SerdeContext serdeContext, String json, Class<T> 
clazz) {
+        final ObjectReader objectReader = 
JsonSerdeUtil.createObjectReader(serdeContext);
+        try {
+            return objectReader.readValue(json, clazz);
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+    }
+
+    private JsonSerdeMocks() {

Review comment:
       nit: maybe move to the top of the class.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java
##########
@@ -46,83 +35,113 @@
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject;
+import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for serialization/deserialization of {@link RelDataType}. */
-@RunWith(Parameterized.class)
+/** Tests for {@link RelDataType} serialization and deserialization. */
 public class RelDataTypeJsonSerdeTest {
+
     private static final FlinkTypeFactory FACTORY = 
FlinkTypeFactory.INSTANCE();
 
-    @Parameterized.Parameters(name = "type = {0}")
-    public static Collection<RelDataType> parameters() {
+    @ParameterizedTest
+    @MethodSource("testRelDataTypeSerde")
+    public void testRelDataTypeSerde(RelDataType relDataType) throws 
IOException {
+        final SerdeContext serdeContext = configuredSerdeContext();
+
+        final String json = toJson(serdeContext, relDataType);
+        final RelDataType actual = toObject(serdeContext, json, 
RelDataType.class);
+
+        assertThat(actual).isSameAs(relDataType);
+    }
+
+    @Test
+    public void testMissingPrecisionAndScale() {
+        final SerdeContext serdeContext = configuredSerdeContext();
+
+        final String json =
+                toJson(
+                        serdeContext,
+                        FACTORY.createSqlIntervalType(
+                                new SqlIntervalQualifier(
+                                        TimeUnit.DAY, TimeUnit.SECOND, 
SqlParserPos.ZERO)));
+        final RelDataType actual = toObject(serdeContext, json, 
RelDataType.class);
+
+        assertThat(actual)
+                .isSameAs(
+                        FACTORY.createSqlIntervalType(
+                                new SqlIntervalQualifier(
+                                        TimeUnit.DAY,
+                                        
DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+                                        TimeUnit.SECOND,
+                                        
DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION,
+                                        SqlParserPos.ZERO)));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Test data
+    // 
--------------------------------------------------------------------------------------------
+
+    @Parameters(name = "{0}")
+    public static List<RelDataType> testRelDataTypeSerde() {
         // the values in the list do not care about nullable.
-        List<RelDataType> types =
+        final List<RelDataType> types =
                 Arrays.asList(
                         FACTORY.createSqlType(SqlTypeName.BOOLEAN),
                         FACTORY.createSqlType(SqlTypeName.TINYINT),
                         FACTORY.createSqlType(SqlTypeName.SMALLINT),
                         FACTORY.createSqlType(SqlTypeName.INTEGER),
                         FACTORY.createSqlType(SqlTypeName.BIGINT),
-                        FACTORY.createSqlType(SqlTypeName.DECIMAL, 3, 10),
-                        FACTORY.createSqlType(SqlTypeName.DECIMAL, 0, 19),
-                        FACTORY.createSqlType(SqlTypeName.DECIMAL, -1, 19),

Review comment:
       Shouldn't we test the negative value?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
##########
@@ -132,101 +130,35 @@
                                 BigDecimal.valueOf(random.nextFloat()),
                                 FACTORY.createSqlType(SqlTypeName.FLOAT)),
                         
rexBuilder.makeExactLiteral(BigDecimal.valueOf(random.nextDouble())),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.YEAR, TimeUnit.YEAR, 
SqlParserPos.ZERO)),
                         rexBuilder.makeIntervalLiteral(
                                 BigDecimal.valueOf(100),
                                 new SqlIntervalQualifier(
-                                        TimeUnit.YEAR, TimeUnit.YEAR, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.YEAR, TimeUnit.MONTH, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MONTH, TimeUnit.MONTH, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MONTH, TimeUnit.MONTH, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.DAY, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.DAY, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.HOUR, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.HOUR, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.MINUTE, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.MINUTE, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.SECOND, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.SECOND, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.HOUR, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.HOUR, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.MINUTE, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.MINUTE, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.SECOND, 
SqlParserPos.ZERO)),
+                                        TimeUnit.YEAR,
+                                        4,
+                                        TimeUnit.YEAR,
+                                        RelDataType.PRECISION_NOT_SPECIFIED,
+                                        SqlParserPos.ZERO)),
                         rexBuilder.makeIntervalLiteral(
                                 BigDecimal.valueOf(3),
                                 new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.SECOND, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.MINUTE, 
SqlParserPos.ZERO)),
+                                        TimeUnit.YEAR,
+                                        2,
+                                        TimeUnit.MONTH,
+                                        RelDataType.PRECISION_NOT_SPECIFIED,
+                                        SqlParserPos.ZERO)),
                         rexBuilder.makeIntervalLiteral(
                                 BigDecimal.valueOf(3),
                                 new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.MINUTE, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.SECOND, 
SqlParserPos.ZERO)),
+                                        TimeUnit.DAY, 2, TimeUnit.SECOND, 6, 
SqlParserPos.ZERO)),
                         rexBuilder.makeIntervalLiteral(
                                 BigDecimal.valueOf(3),
                                 new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.SECOND, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.SECOND, TimeUnit.SECOND, 
SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.SECOND, TimeUnit.SECOND, 
SqlParserPos.ZERO)),
+                                        TimeUnit.SECOND, 2, TimeUnit.SECOND, 
6, SqlParserPos.ZERO)),
                         
rexBuilder.makeDateLiteral(DateString.fromDaysSinceEpoch(10)),
                         rexBuilder.makeDateLiteral(new 
DateString("2000-12-12")),
                         
rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(1234), 3),
                         
rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(123456), 6),
                         rexBuilder.makeTimeLiteral(new 
TimeString("01:01:01.000000001"), 9),
-                        rexBuilder.makeTimeWithLocalTimeZoneLiteral(

Review comment:
       Why did you remove this, and also before some interval related ones?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.planner.calcite.FlinkContextImpl;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+
+import java.io.IOException;
+
+/** Mocks and utilities for serde tests. */
+class JsonSerdeMocks {

Review comment:
       could also be `final` as a utility class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to