twalthr commented on a change in pull request #19113: URL: https://github.com/apache/flink/pull/19113#discussion_r830954244
########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeSystem.java ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeMerging; +import org.apache.flink.util.function.QuadFunction; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; + +import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; + +/** Custom type system for Flink. */ +public class FlinkTypeSystem extends RelDataTypeSystemImpl { + + public static final FlinkTypeSystem INSTANCE = new FlinkTypeSystem(); + public static final DecimalType DECIMAL_SYSTEM_DEFAULT = + new DecimalType(DecimalType.MAX_PRECISION, 18); + + private FlinkTypeSystem() {} + + @Override + public int getMaxNumericPrecision() { + // set the maximum precision of a NUMERIC or DECIMAL type to DecimalType.MAX_PRECISION. + return DecimalType.MAX_PRECISION; + } + + @Override + public int getMaxNumericScale() { + // the max scale can't be greater than precision + return DecimalType.MAX_PRECISION; + } + + @Override + public int getDefaultPrecision(SqlTypeName typeName) { + switch (typeName) { + case VARCHAR: + case VARBINARY: + // Calcite will limit the length of the VARCHAR field to 65536 + return Integer.MAX_VALUE; + case TIMESTAMP: + // by default we support timestamp with microseconds precision (Timestamp(6)) + return TimestampType.DEFAULT_PRECISION; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // by default we support timestamp with local time zone with microseconds precision + // Timestamp(6) with local time zone + return LocalZonedTimestampType.DEFAULT_PRECISION; + } + return super.getDefaultPrecision(typeName); + } + + @Override + public int getMaxPrecision(SqlTypeName typeName) { + switch (typeName) { + case VARCHAR: + case CHAR: + case VARBINARY: + case BINARY: + return Integer.MAX_VALUE; + + case TIMESTAMP: + // The maximum precision of TIMESTAMP is 3 in Calcite, + // change it to 9 to support nanoseconds precision + return TimestampType.MAX_PRECISION; + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // The maximum precision of TIMESTAMP_WITH_LOCAL_TIME_ZONE is 3 in Calcite, + // change it to 9 to support nanoseconds precision + return LocalZonedTimestampType.MAX_PRECISION; + } + return super.getMaxPrecision(typeName); + } + + @Override + public boolean shouldConvertRaggedUnionTypesToVarying() { + // when union a number of CHAR types of different lengths, we should cast to a VARCHAR + // this fixes the problem of CASE WHEN with different length string literals but get wrong + // result with additional space suffix + return true; + } + + @Override + public RelDataType deriveAvgAggType( + RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findAvgAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + + @Override + public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findSumAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + + @Override + public RelDataType deriveDecimalPlusType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + return deriveDecimalType( + typeFactory, type1, type2, LogicalTypeMerging::findAdditionDecimalType); + } + + @Override + public RelDataType deriveDecimalModType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + if (SqlTypeUtil.isExactNumeric(type1) + && SqlTypeUtil.isExactNumeric(type2) + && (SqlTypeUtil.isDecimal(type1) || SqlTypeUtil.isDecimal(type2))) { + RelDataType decType1 = adjustType(typeFactory, type1); + RelDataType decType2 = adjustType(typeFactory, type2); + if (decType1.getScale() == 0 && decType2.getScale() == 0) { + return type2; + } + DecimalType result = + LogicalTypeMerging.findModuloDecimalType( + decType1.getPrecision(), + decType1.getScale(), + decType2.getPrecision(), + decType2.getScale()); + return typeFactory.createSqlType(DECIMAL, result.getPrecision(), result.getScale()); + } else { + return null; + } + } + + @Override + public RelDataType deriveDecimalDivideType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + return deriveDecimalType( + typeFactory, type1, type2, LogicalTypeMerging::findDivisionDecimalType); + } + + @Override + public RelDataType deriveDecimalMultiplyType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + return deriveDecimalType( + typeFactory, type1, type2, LogicalTypeMerging::findMultiplicationDecimalType); + } + + /** Use derivation from [[LogicalTypeMerging]] to derive decimal type. */ Review comment: update to JavaDoc ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeSystem.java ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeMerging; +import org.apache.flink.util.function.QuadFunction; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; + +import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; + +/** Custom type system for Flink. */ +public class FlinkTypeSystem extends RelDataTypeSystemImpl { + + public static final FlinkTypeSystem INSTANCE = new FlinkTypeSystem(); + public static final DecimalType DECIMAL_SYSTEM_DEFAULT = + new DecimalType(DecimalType.MAX_PRECISION, 18); + + private FlinkTypeSystem() {} + + @Override + public int getMaxNumericPrecision() { + // set the maximum precision of a NUMERIC or DECIMAL type to DecimalType.MAX_PRECISION. + return DecimalType.MAX_PRECISION; + } + + @Override + public int getMaxNumericScale() { + // the max scale can't be greater than precision + return DecimalType.MAX_PRECISION; + } + + @Override + public int getDefaultPrecision(SqlTypeName typeName) { + switch (typeName) { + case VARCHAR: + case VARBINARY: + // Calcite will limit the length of the VARCHAR field to 65536 + return Integer.MAX_VALUE; + case TIMESTAMP: + // by default we support timestamp with microseconds precision (Timestamp(6)) + return TimestampType.DEFAULT_PRECISION; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // by default we support timestamp with local time zone with microseconds precision + // Timestamp(6) with local time zone + return LocalZonedTimestampType.DEFAULT_PRECISION; + } + return super.getDefaultPrecision(typeName); + } + + @Override + public int getMaxPrecision(SqlTypeName typeName) { + switch (typeName) { + case VARCHAR: + case CHAR: + case VARBINARY: + case BINARY: + return Integer.MAX_VALUE; + + case TIMESTAMP: + // The maximum precision of TIMESTAMP is 3 in Calcite, + // change it to 9 to support nanoseconds precision + return TimestampType.MAX_PRECISION; + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // The maximum precision of TIMESTAMP_WITH_LOCAL_TIME_ZONE is 3 in Calcite, + // change it to 9 to support nanoseconds precision + return LocalZonedTimestampType.MAX_PRECISION; + } + return super.getMaxPrecision(typeName); + } + + @Override + public boolean shouldConvertRaggedUnionTypesToVarying() { + // when union a number of CHAR types of different lengths, we should cast to a VARCHAR + // this fixes the problem of CASE WHEN with different length string literals but get wrong + // result with additional space suffix + return true; + } + + @Override + public RelDataType deriveAvgAggType( + RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findAvgAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + + @Override + public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findSumAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + + @Override + public RelDataType deriveDecimalPlusType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + return deriveDecimalType( + typeFactory, type1, type2, LogicalTypeMerging::findAdditionDecimalType); + } + + @Override + public RelDataType deriveDecimalModType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + if (SqlTypeUtil.isExactNumeric(type1) Review comment: Use `deriveDecimalType`? ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala ########## @@ -565,7 +566,10 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] { inputType: RelDataType, isBounded: Boolean): AggregateCall = { val outputIndexToAggCallIndexMap = AggregateUtil.getOutputIndexToAggCallIndexMap( - aggCalls, inputType, isBounded) + ShortcutUtils.unwrapTypeFactory(input), Review comment: how about we do a static import for all `ShortcutUtils.unwrapTypeFactory` to make the code more concise. I added a similar feedback to Marios PR recently. ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala ########## @@ -404,12 +422,13 @@ object AggregateUtil extends Enumeration { indexOfExistingCountStar: Option[Int], isStateBackedDataViews: Boolean, needDistinctInfo: Boolean, - isBounded: Boolean): AggregateInfoList = { + isBounded: Boolean) = { Review comment: @slinkydeveloper still why this? ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala ########## @@ -404,12 +422,13 @@ object AggregateUtil extends Enumeration { indexOfExistingCountStar: Option[Int], isStateBackedDataViews: Boolean, needDistinctInfo: Boolean, - isBounded: Boolean): AggregateInfoList = { + isBounded: Boolean) = { Review comment: why this? ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeSystem.java ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeMerging; +import org.apache.flink.util.function.QuadFunction; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; + +import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; + +/** Custom type system for Flink. */ +public class FlinkTypeSystem extends RelDataTypeSystemImpl { + + public static final FlinkTypeSystem INSTANCE = new FlinkTypeSystem(); + public static final DecimalType DECIMAL_SYSTEM_DEFAULT = + new DecimalType(DecimalType.MAX_PRECISION, 18); + + private FlinkTypeSystem() {} + + @Override + public int getMaxNumericPrecision() { + // set the maximum precision of a NUMERIC or DECIMAL type to DecimalType.MAX_PRECISION. + return DecimalType.MAX_PRECISION; + } + + @Override + public int getMaxNumericScale() { + // the max scale can't be greater than precision + return DecimalType.MAX_PRECISION; + } + + @Override + public int getDefaultPrecision(SqlTypeName typeName) { + switch (typeName) { + case VARCHAR: + case VARBINARY: + // Calcite will limit the length of the VARCHAR field to 65536 + return Integer.MAX_VALUE; + case TIMESTAMP: + // by default we support timestamp with microseconds precision (Timestamp(6)) + return TimestampType.DEFAULT_PRECISION; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // by default we support timestamp with local time zone with microseconds precision + // Timestamp(6) with local time zone + return LocalZonedTimestampType.DEFAULT_PRECISION; + } + return super.getDefaultPrecision(typeName); + } + + @Override + public int getMaxPrecision(SqlTypeName typeName) { + switch (typeName) { + case VARCHAR: + case CHAR: + case VARBINARY: + case BINARY: + return Integer.MAX_VALUE; + + case TIMESTAMP: + // The maximum precision of TIMESTAMP is 3 in Calcite, + // change it to 9 to support nanoseconds precision + return TimestampType.MAX_PRECISION; + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // The maximum precision of TIMESTAMP_WITH_LOCAL_TIME_ZONE is 3 in Calcite, + // change it to 9 to support nanoseconds precision + return LocalZonedTimestampType.MAX_PRECISION; + } + return super.getMaxPrecision(typeName); + } + + @Override + public boolean shouldConvertRaggedUnionTypesToVarying() { + // when union a number of CHAR types of different lengths, we should cast to a VARCHAR + // this fixes the problem of CASE WHEN with different length string literals but get wrong + // result with additional space suffix + return true; + } + + @Override + public RelDataType deriveAvgAggType( + RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findAvgAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + + @Override + public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findSumAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + + @Override + public RelDataType deriveDecimalPlusType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + return deriveDecimalType( + typeFactory, type1, type2, LogicalTypeMerging::findAdditionDecimalType); + } + + @Override + public RelDataType deriveDecimalModType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + if (SqlTypeUtil.isExactNumeric(type1) + && SqlTypeUtil.isExactNumeric(type2) + && (SqlTypeUtil.isDecimal(type1) || SqlTypeUtil.isDecimal(type2))) { + RelDataType decType1 = adjustType(typeFactory, type1); + RelDataType decType2 = adjustType(typeFactory, type2); + if (decType1.getScale() == 0 && decType2.getScale() == 0) { + return type2; + } + DecimalType result = + LogicalTypeMerging.findModuloDecimalType( + decType1.getPrecision(), + decType1.getScale(), + decType2.getPrecision(), + decType2.getScale()); + return typeFactory.createSqlType(DECIMAL, result.getPrecision(), result.getScale()); + } else { + return null; + } + } + + @Override + public RelDataType deriveDecimalDivideType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + return deriveDecimalType( + typeFactory, type1, type2, LogicalTypeMerging::findDivisionDecimalType); + } + + @Override + public RelDataType deriveDecimalMultiplyType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + return deriveDecimalType( + typeFactory, type1, type2, LogicalTypeMerging::findMultiplicationDecimalType); + } + + /** Use derivation from [[LogicalTypeMerging]] to derive decimal type. */ + private RelDataType deriveDecimalType( Review comment: add `@Nullable` ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/MatchCodeGenerator.scala ########## @@ -678,6 +678,7 @@ class MatchCodeGenerator( matchAgg.inputExprs.indices.map(i => s"TMP$i")) val aggInfoList = AggregateUtil.transformToStreamAggregateInfoList( + relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory], Review comment: you could use `ShortcutUtils` here as well ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeSystem.java ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeMerging; +import org.apache.flink.util.function.QuadFunction; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; + +import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; + +/** Custom type system for Flink. */ +public class FlinkTypeSystem extends RelDataTypeSystemImpl { + + public static final FlinkTypeSystem INSTANCE = new FlinkTypeSystem(); + public static final DecimalType DECIMAL_SYSTEM_DEFAULT = + new DecimalType(DecimalType.MAX_PRECISION, 18); + + private FlinkTypeSystem() {} + + @Override + public int getMaxNumericPrecision() { + // set the maximum precision of a NUMERIC or DECIMAL type to DecimalType.MAX_PRECISION. + return DecimalType.MAX_PRECISION; + } + + @Override + public int getMaxNumericScale() { + // the max scale can't be greater than precision + return DecimalType.MAX_PRECISION; + } + + @Override + public int getDefaultPrecision(SqlTypeName typeName) { + switch (typeName) { + case VARCHAR: + case VARBINARY: + // Calcite will limit the length of the VARCHAR field to 65536 + return Integer.MAX_VALUE; + case TIMESTAMP: + // by default we support timestamp with microseconds precision (Timestamp(6)) + return TimestampType.DEFAULT_PRECISION; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // by default we support timestamp with local time zone with microseconds precision + // Timestamp(6) with local time zone + return LocalZonedTimestampType.DEFAULT_PRECISION; + } + return super.getDefaultPrecision(typeName); + } + + @Override + public int getMaxPrecision(SqlTypeName typeName) { + switch (typeName) { + case VARCHAR: + case CHAR: + case VARBINARY: + case BINARY: + return Integer.MAX_VALUE; + + case TIMESTAMP: + // The maximum precision of TIMESTAMP is 3 in Calcite, + // change it to 9 to support nanoseconds precision + return TimestampType.MAX_PRECISION; + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // The maximum precision of TIMESTAMP_WITH_LOCAL_TIME_ZONE is 3 in Calcite, + // change it to 9 to support nanoseconds precision + return LocalZonedTimestampType.MAX_PRECISION; + } + return super.getMaxPrecision(typeName); + } + + @Override + public boolean shouldConvertRaggedUnionTypesToVarying() { + // when union a number of CHAR types of different lengths, we should cast to a VARCHAR + // this fixes the problem of CASE WHEN with different length string literals but get wrong + // result with additional space suffix + return true; + } + + @Override + public RelDataType deriveAvgAggType( + RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findAvgAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + + @Override + public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType argRelDataType) { + LogicalType argType = FlinkTypeFactory.toLogicalType(argRelDataType); + LogicalType resultType = LogicalTypeMerging.findSumAggType(argType); + return unwrapTypeFactory(typeFactory).createFieldTypeFromLogicalType(resultType); + } + + @Override + public RelDataType deriveDecimalPlusType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + return deriveDecimalType( + typeFactory, type1, type2, LogicalTypeMerging::findAdditionDecimalType); + } + + @Override + public RelDataType deriveDecimalModType( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + if (SqlTypeUtil.isExactNumeric(type1) Review comment: @slinkydeveloper what was the reason for this code duplication? ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java ########## @@ -45,12 +45,16 @@ * </ul> */ public class SourceAbilityContext implements FlinkContext { + private final RowType sourceRowType; private final FlinkContext context; + private final FlinkTypeFactory typeFactory; - public SourceAbilityContext(FlinkContext context, RowType sourceRowType) { + public SourceAbilityContext( Review comment: nit: `FlinkContext context, FlinkTypeFactory typeFactory, RowType sourceRowType` it is otherwise weird to have the row type between context parameters ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkTypeSystem.java ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeMerging; +import org.apache.flink.util.function.QuadFunction; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; + +import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; + +/** Custom type system for Flink. */ +public class FlinkTypeSystem extends RelDataTypeSystemImpl { Review comment: add `@Internal` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org