matriv commented on a change in pull request #19190: URL: https://github.com/apache/flink/pull/19190#discussion_r838375382
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExtractInputTypeStrategy.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.expressions.TimeIntervalUnit; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.utils.DateTimeUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME; + +/** + * Type strategy for EXTRACT, checking the first value is a valid literal of type {@link + * TimeIntervalUnit}, and that the combination of the second argument type and the interval unit is + * correct. + */ +@Internal +public class ExtractInputTypeStrategy implements InputTypeStrategy { + + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.of(2); + } + + @Override + public Optional<List<DataType>> inferInputTypes( + CallContext callContext, boolean throwOnFailure) { + List<DataType> args = callContext.getArgumentDataTypes(); + LogicalType temporalArg = args.get(1).getLogicalType(); + if (!temporalArg.isAnyOf(LogicalTypeFamily.DATETIME, LogicalTypeFamily.INTERVAL)) { + return callContext.failInference( Review comment: Can we please add tests for all these 3 errors that can be produced? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategy; + +import java.util.Optional; + +/** Type strategy of {@code TO_TIMESTAMP_LTZ}. */ +@Internal +public class ToTimestampLtzTypeStrategy implements TypeStrategy { + + @Override + public Optional<DataType> inferType(CallContext callContext) { + if (callContext.isArgumentLiteral(1)) { + int precision = callContext.getArgumentValue(1, Integer.class).get(); Review comment: from documentation: ``` Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). ``` Shouldn't we fail here if precision is not `0` or `3`? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java ########## @@ -1222,76 +1231,103 @@ BuiltInFunctionDefinition.newBuilder() .name("extract") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy(SpecificInputTypeStrategies.EXTRACT) + .outputTypeStrategy(explicit(BIGINT().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_DATE = BuiltInFunctionDefinition.newBuilder() .name("currentDate") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(DATE().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_TIME = BuiltInFunctionDefinition.newBuilder() .name("currentTime") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIME().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("currentTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_ROW_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("currentRowTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull())) .build(); public static final BuiltInFunctionDefinition LOCAL_TIME = BuiltInFunctionDefinition.newBuilder() .name("localTime") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIME().notNull())) .build(); public static final BuiltInFunctionDefinition LOCAL_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("localTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP(3).notNull())) .build(); public static final BuiltInFunctionDefinition TEMPORAL_OVERLAPS = BuiltInFunctionDefinition.newBuilder() .name("temporalOverlaps") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy(SpecificInputTypeStrategies.TEMPORAL_OVERLAPS) + .outputTypeStrategy(explicit(BOOLEAN().notNull())) .build(); public static final BuiltInFunctionDefinition DATE_FORMAT = BuiltInFunctionDefinition.newBuilder() .name("dateFormat") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + or( + sequence( + logical(LogicalTypeFamily.TIMESTAMP), + logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)))) + .outputTypeStrategy(explicit(STRING().notNull())) .build(); public static final BuiltInFunctionDefinition TIMESTAMP_DIFF = BuiltInFunctionDefinition.newBuilder() .name("timestampDiff") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + sequence( + symbol( + TimePointUnit.YEAR, + TimePointUnit.QUARTER, + TimePointUnit.MONTH, + TimePointUnit.WEEK, + TimePointUnit.DAY, + TimePointUnit.HOUR, + TimePointUnit.MINUTE, + TimePointUnit.SECOND), + logical(LogicalTypeFamily.DATETIME), + logical(LogicalTypeFamily.DATETIME))) + .outputTypeStrategy(explicit(INT())) Review comment: What happens if one of the 2 inputs (timestamp or pattern) is null? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java ########## @@ -1222,76 +1231,103 @@ BuiltInFunctionDefinition.newBuilder() .name("extract") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy(SpecificInputTypeStrategies.EXTRACT) + .outputTypeStrategy(explicit(BIGINT().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_DATE = BuiltInFunctionDefinition.newBuilder() .name("currentDate") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(DATE().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_TIME = BuiltInFunctionDefinition.newBuilder() .name("currentTime") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIME().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("currentTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_ROW_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("currentRowTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull())) .build(); public static final BuiltInFunctionDefinition LOCAL_TIME = BuiltInFunctionDefinition.newBuilder() .name("localTime") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIME().notNull())) .build(); public static final BuiltInFunctionDefinition LOCAL_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("localTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP(3).notNull())) .build(); public static final BuiltInFunctionDefinition TEMPORAL_OVERLAPS = BuiltInFunctionDefinition.newBuilder() .name("temporalOverlaps") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy(SpecificInputTypeStrategies.TEMPORAL_OVERLAPS) + .outputTypeStrategy(explicit(BOOLEAN().notNull())) .build(); public static final BuiltInFunctionDefinition DATE_FORMAT = BuiltInFunctionDefinition.newBuilder() .name("dateFormat") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + or( + sequence( + logical(LogicalTypeFamily.TIMESTAMP), + logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)))) + .outputTypeStrategy(explicit(STRING().notNull())) Review comment: What happens if one of the 2 inputs (timestamp or pattern) is null? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java ########## @@ -93,6 +93,17 @@ default ValidationException newValidationError(String message, Object... args) { return new ValidationException(String.format(message, args)); } + /** + * Shorthand for {@code if (throwOnFailure) throw ValidationException(...) else return Review comment: The comment here is not useful, just copies the actual code. And it's a utility method on an interface, can't we define it elsewhere as a static method? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java ########## @@ -1222,76 +1231,103 @@ BuiltInFunctionDefinition.newBuilder() .name("extract") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy(SpecificInputTypeStrategies.EXTRACT) + .outputTypeStrategy(explicit(BIGINT().notNull())) Review comment: Extract can return null if the DATETIME arg is null: ``` SELECT EXTRACT(DAY FROM CAST(null AS DATE)) ``` returns ``` +----------------------+ | EXPR$0 | +----------------------+ | <NULL> | +----------------------+ ``` ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TemporalOverlapsInputTypeStrategy.java ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** Type strategy of {@code TO_TIMESTAMP_LTZ}. */ +@Internal +public class TemporalOverlapsInputTypeStrategy implements InputTypeStrategy { + + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.of(4); + } + + @Override + public Optional<List<DataType>> inferInputTypes( + CallContext callContext, boolean throwOnFailure) { + List<DataType> args = callContext.getArgumentDataTypes(); + LogicalType leftTimePoint = args.get(0).getLogicalType(); + LogicalType leftTemporal = args.get(1).getLogicalType(); + LogicalType rightTimePoint = args.get(2).getLogicalType(); + LogicalType rightTemporal = args.get(3).getLogicalType(); + if (!leftTimePoint.is(LogicalTypeFamily.DATETIME)) { + return callContext.failInference( Review comment: can we also test all those failures? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java ########## @@ -1222,76 +1231,103 @@ BuiltInFunctionDefinition.newBuilder() .name("extract") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy(SpecificInputTypeStrategies.EXTRACT) + .outputTypeStrategy(explicit(BIGINT().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_DATE = BuiltInFunctionDefinition.newBuilder() .name("currentDate") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(DATE().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_TIME = BuiltInFunctionDefinition.newBuilder() .name("currentTime") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIME().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("currentTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_ROW_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("currentRowTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull())) .build(); public static final BuiltInFunctionDefinition LOCAL_TIME = BuiltInFunctionDefinition.newBuilder() .name("localTime") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIME().notNull())) .build(); public static final BuiltInFunctionDefinition LOCAL_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("localTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP(3).notNull())) .build(); public static final BuiltInFunctionDefinition TEMPORAL_OVERLAPS = BuiltInFunctionDefinition.newBuilder() .name("temporalOverlaps") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy(SpecificInputTypeStrategies.TEMPORAL_OVERLAPS) + .outputTypeStrategy(explicit(BOOLEAN().notNull())) .build(); public static final BuiltInFunctionDefinition DATE_FORMAT = BuiltInFunctionDefinition.newBuilder() .name("dateFormat") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + or( + sequence( + logical(LogicalTypeFamily.TIMESTAMP), + logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)))) + .outputTypeStrategy(explicit(STRING().notNull())) .build(); public static final BuiltInFunctionDefinition TIMESTAMP_DIFF = BuiltInFunctionDefinition.newBuilder() .name("timestampDiff") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + sequence( + symbol( + TimePointUnit.YEAR, + TimePointUnit.QUARTER, + TimePointUnit.MONTH, + TimePointUnit.WEEK, + TimePointUnit.DAY, + TimePointUnit.HOUR, + TimePointUnit.MINUTE, + TimePointUnit.SECOND), + logical(LogicalTypeFamily.DATETIME), + logical(LogicalTypeFamily.DATETIME))) + .outputTypeStrategy(explicit(INT())) .build(); public static final BuiltInFunctionDefinition TO_TIMESTAMP_LTZ = BuiltInFunctionDefinition.newBuilder() .name("toTimestampLtz") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + sequence( + logical(LogicalTypeFamily.NUMERIC), Review comment: I don't get this, why separate `NUMERIC` and `INTEGER_NUMERIC`? and allow nullability in the first? Could you please explain? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
