This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f6b662f83de [FLINK-33412] Implement type inference for
reinterpret_cast function
f6b662f83de is described below
commit f6b662f83deb80572773617f7eb202fa05388198
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Oct 31 12:55:13 2023 +0100
[FLINK-33412] Implement type inference for reinterpret_cast function
---
.../functions/BuiltInFunctionDefinitions.java | 3 +-
.../ReinterpretCastInputTypeStrategy.java | 98 ++++++++++++++++++++++
.../strategies/SpecificInputTypeStrategies.java | 2 +
.../types/logical/utils/LogicalTypeCasts.java | 48 +++++++++++
.../types/inference/InputTypeStrategiesTest.java | 29 ++++++-
.../inference/InputTypeStrategiesTestBase.java | 19 ++---
.../expressions/PlannerExpressionConverter.scala | 8 --
.../table/planner/expressions/Reinterpret.scala | 49 -----------
.../table/planner/typeutils/TypeCoercion.scala | 81 ------------------
9 files changed, 185 insertions(+), 152 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index e653d1d6463..2d49dc30725 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -2254,7 +2254,8 @@ public final class BuiltInFunctionDefinitions {
BuiltInFunctionDefinition.newBuilder()
.name("reinterpretCast")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(SpecificInputTypeStrategies.REINTERPRET_CAST)
+ .outputTypeStrategy(TypeStrategies.argument(1))
.build();
public static final BuiltInFunctionDefinition AS =
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ReinterpretCastInputTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ReinterpretCastInputTypeStrategy.java
new file mode 100644
index 00000000000..d33ddcdb452
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ReinterpretCastInputTypeStrategy.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * {@link InputTypeStrategy} specific for {@link
BuiltInFunctionDefinitions#REINTERPRET_CAST}.
+ *
+ * <p>It expects three arguments where the type of first one must be
reinterpretable as the type of
+ * the second one. The second one must be a type literal. The third a BOOLEAN
literal if the
+ * reinterpretation may result in an overflow.
+ */
+@Internal
+public final class ReinterpretCastInputTypeStrategy implements
InputTypeStrategy {
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return ConstantArgumentCount.of(3);
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
+
+ // check for type literal
+ if (!callContext.isArgumentLiteral(1)
+ || !callContext.getArgumentValue(1,
DataType.class).isPresent()) {
+ return callContext.fail(
+ throwOnFailure, "Expected type literal for the second
argument.");
+ }
+
+ if
(!argumentDataTypes.get(2).getLogicalType().is(LogicalTypeRoot.BOOLEAN)
+ || !callContext.isArgumentLiteral(2)
+ || callContext.isArgumentNull(2)) {
+ return callContext.fail(
+ throwOnFailure, "Not null boolean literal expected for
overflow.");
+ }
+
+ final LogicalType fromType = argumentDataTypes.get(0).getLogicalType();
+ final LogicalType toType = argumentDataTypes.get(1).getLogicalType();
+
+ // A hack to support legacy types. To be removed when we drop the
legacy types.
+ if (fromType instanceof LegacyTypeInformationType) {
+ return Optional.of(argumentDataTypes);
+ }
+ if (!LogicalTypeCasts.supportsReinterpretCast(fromType, toType)) {
+ return callContext.fail(
+ throwOnFailure,
+ "Unsupported reinterpret cast from '%s' to '%s'.",
+ fromType,
+ toType);
+ }
+
+ return Optional.of(argumentDataTypes);
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return Collections.singletonList(
+ Signature.of(
+ Signature.Argument.ofGroup("ANY"),
+ Signature.Argument.ofGroup("TYPE LITERAL"),
+ Signature.Argument.ofGroup("TRUE | FALSE")));
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index 56809c7e69c..b004ce20e9d 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -50,6 +50,8 @@ public final class SpecificInputTypeStrategies {
/** See {@link CastInputTypeStrategy}. */
public static final InputTypeStrategy CAST = new CastInputTypeStrategy();
+ public static final InputTypeStrategy REINTERPRET_CAST = new
ReinterpretCastInputTypeStrategy();
+
/** See {@link MapInputTypeStrategy}. */
public static final InputTypeStrategy MAP = new MapInputTypeStrategy();
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index 72321b8accf..54c9c7d1260 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -288,6 +288,54 @@ public final class LogicalTypeCasts {
return supportsCasting(sourceType, targetType, true);
}
+ /**
+ * Returns whether the source type can be reinterpreted as the target type.
+ *
+ * <p>Reinterpret casts correspond to the SQL reinterpret_cast and
represent the logic behind a
+ * {@code REINTERPRET_CAST(sourceType AS targetType)} operation.
+ */
+ public static boolean supportsReinterpretCast(LogicalType sourceType,
LogicalType targetType) {
+ if (sourceType.getTypeRoot() == targetType.getTypeRoot()) {
+ return true;
+ }
+
+ switch (sourceType.getTypeRoot()) {
+ case INTEGER:
+ switch (targetType.getTypeRoot()) {
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTERVAL_YEAR_MONTH:
+ return true;
+ default:
+ return false;
+ }
+ case BIGINT:
+ switch (targetType.getTypeRoot()) {
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case INTERVAL_DAY_TIME:
+ return true;
+ default:
+ return false;
+ }
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTERVAL_YEAR_MONTH:
+ switch (targetType.getTypeRoot()) {
+ case INTEGER:
+ case BIGINT:
+ return true;
+ default:
+ return false;
+ }
+
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case INTERVAL_DAY_TIME:
+ return targetType.getTypeRoot() == BIGINT;
+ default:
+ return false;
+ }
+ }
+
//
--------------------------------------------------------------------------------------------
private static boolean supportsCasting(
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index 14c7c89a451..3a9139a645f 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -636,7 +636,34 @@ class InputTypeStrategiesTest extends
InputTypeStrategiesTestBase {
.expectSignature("f(<ARRAY>, <ARRAY ELEMENT>)")
.expectArgumentTypes(
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
- DataTypes.INT()));
+ DataTypes.INT()),
+ TestSpec.forStrategy(
+ "Reinterpret_cast strategy",
+ SpecificInputTypeStrategies.REINTERPRET_CAST)
+ .calledWithArgumentTypes(
+ DataTypes.DATE(), DataTypes.BIGINT(),
DataTypes.BOOLEAN().notNull())
+ .calledWithLiteralAt(1, DataTypes.BIGINT())
+ .calledWithLiteralAt(2, true)
+ .expectSignature("f(<ANY>, <TYPE LITERAL>, <TRUE |
FALSE>)")
+ .expectArgumentTypes(
+ DataTypes.DATE(),
+ DataTypes.BIGINT(),
+ DataTypes.BOOLEAN().notNull()),
+ TestSpec.forStrategy(
+ "Reinterpret_cast strategy non literal
overflow",
+ SpecificInputTypeStrategies.REINTERPRET_CAST)
+ .calledWithArgumentTypes(
+ DataTypes.DATE(), DataTypes.BIGINT(),
DataTypes.BOOLEAN().notNull())
+ .calledWithLiteralAt(1, DataTypes.BIGINT())
+ .expectErrorMessage("Not null boolean literal expected
for overflow."),
+ TestSpec.forStrategy(
+ "Reinterpret_cast strategy not supported cast",
+ SpecificInputTypeStrategies.REINTERPRET_CAST)
+ .calledWithArgumentTypes(
+ DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.BOOLEAN().notNull())
+ .calledWithLiteralAt(1, DataTypes.BIGINT())
+ .calledWithLiteralAt(2, true)
+ .expectErrorMessage("Unsupported reinterpret cast from
'INT' to 'BIGINT'"));
}
/** Simple pojo that should be converted to a Structured type. */
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
index 73c9e610119..06ff25f9a9a 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
@@ -35,7 +35,9 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -92,15 +94,11 @@ public abstract class InputTypeStrategiesTestBase {
callContextMock.argumentDataTypes = actualArgumentTypes;
callContextMock.argumentLiterals =
IntStream.range(0, actualArgumentTypes.size())
- .mapToObj(i -> testSpec.literalPos != null && i ==
testSpec.literalPos)
+ .mapToObj(i -> testSpec.literals.containsKey(i))
.collect(Collectors.toList());
callContextMock.argumentValues =
IntStream.range(0, actualArgumentTypes.size())
- .mapToObj(
- i ->
- (testSpec.literalPos != null && i ==
testSpec.literalPos)
- ?
Optional.ofNullable(testSpec.literalValue)
- : Optional.empty())
+ .mapToObj(i ->
Optional.ofNullable(testSpec.literals.get(i)))
.collect(Collectors.toList());
callContextMock.argumentNulls =
IntStream.range(0, actualArgumentTypes.size())
@@ -161,9 +159,7 @@ public abstract class InputTypeStrategiesTestBase {
private List<List<DataType>> actualArgumentTypes = new ArrayList<>();
- private @Nullable Integer literalPos;
-
- private @Nullable Object literalValue;
+ private Map<Integer, Object> literals = new HashMap<>();
private @Nullable InputTypeStrategy surroundingStrategy;
@@ -207,13 +203,12 @@ public abstract class InputTypeStrategiesTestBase {
}
public TestSpec calledWithLiteralAt(int pos) {
- this.literalPos = pos;
+ this.literals.put(pos, null);
return this;
}
public TestSpec calledWithLiteralAt(int pos, Object value) {
- this.literalPos = pos;
- this.literalValue = value;
+ this.literals.put(pos, value);
return this;
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 8983fbbd64a..953da5ede4f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -69,14 +69,6 @@ class PlannerExpressionConverter private extends
ApiExpressionVisitor[PlannerExp
// special case: requires individual handling of child expressions
func match {
- case REINTERPRET_CAST =>
- assert(children.size == 3)
- return Reinterpret(
- children.head.accept(this),
-
fromDataTypeToTypeInfo(children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType),
- getValue[Boolean](children(2).accept(this))
- )
-
case WINDOW_START =>
assert(children.size == 1)
val windowReference = translateWindowReference(children.head)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/Reinterpret.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/Reinterpret.scala
deleted file mode 100644
index 13998c7b7d6..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/Reinterpret.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.planner.typeutils.TypeCoercion
-import org.apache.flink.table.planner.validate._
-import
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-
-case class Reinterpret(
- child: PlannerExpression,
- resultType: TypeInformation[_],
- checkOverflow: Boolean)
- extends UnaryExpression {
-
- override def toString = s"$child.reinterpret($resultType)"
-
- override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
- val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
- copy(child, resultType).asInstanceOf[this.type]
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- if (
- TypeCoercion.canReinterpret(
- fromTypeInfoToLogicalType(child.resultType),
- fromTypeInfoToLogicalType(resultType))
- ) {
- ValidationSuccess
- } else {
- ValidationFailure(s"Unsupported reinterpret from ${child.resultType} to
$resultType")
- }
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/typeutils/TypeCoercion.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/typeutils/TypeCoercion.scala
index 6e284464225..c6a06b8c64e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/typeutils/TypeCoercion.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/typeutils/TypeCoercion.scala
@@ -53,85 +53,4 @@ object TypeCoercion {
case _ => false
}
-
- /**
- * All the supported cast types in flink-table.
- *
- * Note: No distinction between explicit and implicit conversions Note: This
is a subset of
- * SqlTypeAssignmentRule Note: This may lose type during the cast.
- */
- def canCast(from: LogicalType, to: LogicalType): Boolean =
- (from.getTypeRoot, to.getTypeRoot) match {
- case (_, _) if from == to => true
-
- case (_, VARCHAR | CHAR) => true
-
- case (VARCHAR | CHAR, _) if isNumeric(to) => true
- case (VARCHAR | CHAR, BOOLEAN) => true
- case (VARCHAR | CHAR, DECIMAL) => true
- case (VARCHAR | CHAR, DATE) => true
- case (VARCHAR | CHAR, TIME_WITHOUT_TIME_ZONE) => true
- case (VARCHAR | CHAR, TIMESTAMP_WITHOUT_TIME_ZONE) => true
- case (VARCHAR | CHAR, TIMESTAMP_WITH_LOCAL_TIME_ZONE) => true
-
- case (BOOLEAN, _) if isNumeric(to) => true
- case (BOOLEAN, DECIMAL) => true
- case (_, BOOLEAN) if isNumeric(from) => true
- case (DECIMAL, BOOLEAN) => true
-
- case (_, _) if isNumeric(from) && isNumeric(to) => true
- case (_, DECIMAL) if isNumeric(from) => true
- case (DECIMAL, _) if isNumeric(to) => true
- case (DECIMAL, DECIMAL) => true
- case (INTEGER, DATE) => true
- case (INTEGER, TIME_WITHOUT_TIME_ZONE) => true
- case (TINYINT, TIMESTAMP_WITHOUT_TIME_ZONE) => true
- case (SMALLINT, TIMESTAMP_WITHOUT_TIME_ZONE) => true
- case (INTEGER, TIMESTAMP_WITHOUT_TIME_ZONE) => true
- case (BIGINT, TIMESTAMP_WITHOUT_TIME_ZONE) => true
- case (DOUBLE, TIMESTAMP_WITHOUT_TIME_ZONE) => true
- case (FLOAT, TIMESTAMP_WITHOUT_TIME_ZONE) => true
- case (INTEGER, INTERVAL_YEAR_MONTH) => true
- case (BIGINT, INTERVAL_DAY_TIME) => true
-
- case (DATE, TIME_WITHOUT_TIME_ZONE) => false
- case (TIME_WITHOUT_TIME_ZONE, DATE) => false
- case (_, _) if isTimePoint(from) && isTimePoint(to) => true
- case (DATE, INTEGER) => true
- case (TIME_WITHOUT_TIME_ZONE, INTEGER) => true
- case (TIMESTAMP_WITHOUT_TIME_ZONE, TINYINT) => true
- case (TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER) => true
- case (TIMESTAMP_WITHOUT_TIME_ZONE, SMALLINT) => true
- case (TIMESTAMP_WITHOUT_TIME_ZONE, BIGINT) => true
- case (TIMESTAMP_WITHOUT_TIME_ZONE, DOUBLE) => true
- case (TIMESTAMP_WITHOUT_TIME_ZONE, FLOAT) => true
-
- case (INTERVAL_YEAR_MONTH, INTEGER) => true
- case (INTERVAL_DAY_TIME, BIGINT) => true
-
- case _ => false
- }
-
- /** All the supported reinterpret types in flink-table. */
- def canReinterpret(from: LogicalType, to: LogicalType): Boolean =
- (from.getTypeRoot, to.getTypeRoot) match {
- case (_, _) if from == to => true
-
- case (DATE, INTEGER) => true
- case (TIME_WITHOUT_TIME_ZONE, INTEGER) => true
- case (TIMESTAMP_WITHOUT_TIME_ZONE, BIGINT) => true
- case (INTEGER, DATE) => true
- case (INTEGER, TIME_WITHOUT_TIME_ZONE) => true
- case (BIGINT, TIMESTAMP_WITHOUT_TIME_ZONE) => true
- case (INTEGER, INTERVAL_YEAR_MONTH) => true
- case (BIGINT, INTERVAL_DAY_TIME) => true
- case (INTERVAL_YEAR_MONTH, INTEGER) => true
- case (INTERVAL_DAY_TIME, BIGINT) => true
-
- case (DATE, BIGINT) => true
- case (TIME_WITHOUT_TIME_ZONE, BIGINT) => true
- case (INTERVAL_YEAR_MONTH, BIGINT) => true
-
- case _ => false
- }
}