This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 73bb670ec6888ce55a9185a222b7b4a7cdb62d05 Author: Jark Wu <[email protected]> AuthorDate: Wed Jul 31 16:44:51 2019 +0800 [FLINK-13290][table-api] Add method to check LogicalType compatible Add a areTypesCompatible() method to LogicalTypeChecks. This will compare two LogicalTypes without field names and other logical attributes (e.g. description, isFinal). --- .../types/logical/utils/LogicalTypeChecks.java | 61 +++++ .../types/LogicalTypeCompatibleCheckTest.java | 300 +++++++++++++++++++++ 2 files changed, 361 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java index 3e2962e..45e2318 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java @@ -39,6 +39,10 @@ import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.YearMonthIntervalType; import org.apache.flink.table.types.logical.ZonedTimestampType; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Utilities for checking {@link LogicalType} and avoiding a lot of type casting and repetitive work. */ @@ -145,6 +149,20 @@ public final class LogicalTypeChecks { return logicalType.accept(SINGLE_FIELD_INTERVAL_EXTRACTOR); } + /** + * Returns true if the two given types are compatible. Types are compatible is for atomic types + * (VARCHAR, INT, BOOLEAN, etc..), they must be fully equal (i.e. {@link LogicalType#equals(Object)}), + * for complex types (ARRAY, ROW, MAP, etc..), they must be in the same type but ignore field + * names and other logical attributes, and all the children types ({@link LogicalType#getChildren()}) + * must be compatible too. + */ + public static boolean areTypesCompatible(LogicalType thisType, LogicalType thatType) { + checkNotNull(thisType); + checkNotNull(thatType); + TypeCompatibleVisitor visitor = new TypeCompatibleVisitor(thisType); + return thatType.accept(visitor); + } + private LogicalTypeChecks() { // no instantiation } @@ -332,4 +350,47 @@ public final class LogicalTypeChecks { } } } + + private static class TypeCompatibleVisitor extends LogicalTypeDefaultVisitor<Boolean> { + + private final LogicalType thisType; + + private TypeCompatibleVisitor(LogicalType thisType) { + checkNotNull(thisType); + this.thisType = thisType; + } + + @Override + protected Boolean defaultMethod(LogicalType thatType) { + checkNotNull(thatType); + if (thisType == thatType) { + return true; + } + if (thisType.getClass() != thatType.getClass() || + thisType.isNullable() != thatType.isNullable() || + thisType.getTypeRoot() != thatType.getTypeRoot()) { + return false; + } + + List<LogicalType> thisChildren = thisType.getChildren(); + List<LogicalType> thatChildren = thatType.getChildren(); + if (thisChildren.size() != thatChildren.size()) { + return false; + } + if (thisChildren.isEmpty()) { + // if it is an atomic type, delegate to equals method. + return thisType.equals(thatType); + } else { + // if it is composite type, only need to check children types + for (int i = 0; i < thisChildren.size(); i++) { + LogicalType thisChild = thisChildren.get(i); + LogicalType thatChild = thatChildren.get(i); + if (!areTypesCompatible(thisChild, thatChild)) { + return false; + } + } + return true; + } + } + } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCompatibleCheckTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCompatibleCheckTest.java new file mode 100644 index 0000000..ba38c08 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCompatibleCheckTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.TypeInformationAnyType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; +import org.apache.flink.table.types.logical.ZonedTimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link LogicalTypeChecks#areTypesCompatible(LogicalType, LogicalType)}. + */ +@RunWith(Parameterized.class) +public class LogicalTypeCompatibleCheckTest { + + @Parameters(name = "{index}: [{0} COMPATIBLE {1} => {2}") + public static List<Object[]> testData() { + return Arrays.asList( + new Object[][]{ + {new CharType(), new CharType(5), false}, + + {new VarCharType(), new VarCharType(33), false}, + + {new BooleanType(), new BooleanType(false), false}, + + {new BinaryType(), new BinaryType(22), false}, + + {new VarBinaryType(), new VarBinaryType(44), false}, + + {new DecimalType(), new DecimalType(10, 2), false}, + + {new TinyIntType(), new TinyIntType(false), false}, + + {new SmallIntType(), new SmallIntType(false), false}, + + {new IntType(), new IntType(false), false}, + + {new BigIntType(), new BigIntType(false), false}, + + {new FloatType(), new FloatType(false), false}, + + {new DoubleType(), new DoubleType(false), false}, + + {new DateType(), new DateType(false), false}, + + {new TimeType(), new TimeType(9), false}, + + {new TimestampType(9), new TimestampType(3), false}, + + {new ZonedTimestampType(9), new ZonedTimestampType(3), false}, + + {new ZonedTimestampType(false, TimestampKind.PROCTIME, 9), new ZonedTimestampType(3), false}, + + { + new YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH, 2), + new YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.MONTH), + false + }, + + { + new DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 6), + new DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 7), + false + }, + + { + new ArrayType(new TimestampType()), + new ArrayType(new SmallIntType()), + false, + }, + + { + new MultisetType(new TimestampType()), + new MultisetType(new SmallIntType()), + false + }, + + { + new MapType(new VarCharType(20), new TimestampType()), + new MapType(new VarCharType(99), new TimestampType()), + false + }, + + { + new RowType( + Arrays.asList( + new RowType.RowField("a", new VarCharType()), + new RowType.RowField("b", new VarCharType()), + new RowType.RowField("c", new VarCharType()), + new RowType.RowField("d", new TimestampType()))), + new RowType( + Arrays.asList( + new RowType.RowField("_a", new VarCharType()), + new RowType.RowField("_b", new VarCharType()), + new RowType.RowField("_c", new VarCharType()), + new RowType.RowField("_d", new TimestampType()))), + // field name doesn't matter + true + }, + + { + new RowType( + Arrays.asList( + new RowField("f1", new IntType()), + new RowField("f2", new VarCharType()) + ) + ), + new RowType( + Arrays.asList( + new RowField("f1", new IntType()), + new RowField("f2", new BooleanType()) + ) + ), + false + }, + + { + new ArrayType( + new RowType( + Arrays.asList( + new RowField("f1", new IntType()), + new RowField("f2", new IntType()) + ) + ) + ), + new ArrayType( + new RowType( + Arrays.asList( + new RowField("f3", new IntType()), + new RowField("f4", new IntType()) + ) + ) + ), + true + }, + + { + new MapType( + new IntType(), + new RowType( + Arrays.asList( + new RowField("f1", new IntType()), + new RowField("f2", new IntType()) + ) + ) + ), + new MapType( + new IntType(), + new RowType( + Arrays.asList( + new RowField("f3", new IntType()), + new RowField("f4", new IntType()) + ) + ) + ), + true + }, + + { + new MultisetType( + new RowType( + Arrays.asList( + new RowField("f1", new IntType()), + new RowField("f2", new IntType()) + ) + ) + ), + new MultisetType( + new RowType( + Arrays.asList( + new RowField("f1", new IntType()), + new RowField("f2", new IntType()) + ) + ) + ), + true + }, + + { + new TypeInformationAnyType<>(Types.GENERIC(LogicalTypesTest.class)), + new TypeInformationAnyType<>(Types.GENERIC(Object.class)), + false + }, + + { + createUserType(new IntType(), new VarCharType()), + createUserType(new IntType(), new VarCharType()), + true + }, + + { + createDistinctType(new DecimalType(10, 2)), + createDistinctType(new DecimalType(10, 2)), + true + } + } + ); + } + + @Parameter + public LogicalType sourceType; + + @Parameter(1) + public LogicalType targetType; + + @Parameter(2) + public boolean equals; + + @Test + public void testAreTypesCompatible() { + assertThat( + LogicalTypeChecks.areTypesCompatible(sourceType, targetType), + equalTo(equals)); + assertTrue(LogicalTypeChecks.areTypesCompatible(sourceType, sourceType.copy())); + assertTrue(LogicalTypeChecks.areTypesCompatible(targetType, targetType.copy())); + } + + private static DistinctType createDistinctType(LogicalType sourceType) { + return new DistinctType.Builder( + ObjectIdentifier.of("cat", "db", UUID.randomUUID().toString()), + sourceType) + .setDescription("Money type desc.") + .build(); + } + + private static StructuredType createUserType(LogicalType... children) { + return new StructuredType.Builder( + ObjectIdentifier.of("cat", "db", "User"), + Arrays.stream(children).map(lt -> + new StructuredType.StructuredAttribute(UUID.randomUUID().toString(), lt)) + .collect(Collectors.toList())) + .setDescription("User type desc.") + .setFinal(true) + .setInstantiable(true) + .setImplementationClass(User.class) + .build(); + } + + private static final class User { + public int setting; + } +}
