This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 0625b31 [FLINK-18673][table] Improve support for ROW constructor
0625b31 is described below
commit 0625b31521247d03b1e2b5280120c6546865da49
Author: Timo Walther <[email protected]>
AuthorDate: Fri Nov 13 15:52:18 2020 +0100
[FLINK-18673][table] Improve support for ROW constructor
This closes #14067.
---
.../types/logical/utils/LogicalTypeCasts.java | 22 +++++++++++-----------
.../flink/table/types/LogicalTypeCastsTest.java | 12 ++++++++++++
.../planner/calcite/FlinkCalciteSqlValidator.java | 10 ++++++++++
.../table/planner/calcite/FlinkTypeFactory.scala | 5 +++++
4 files changed, 38 insertions(+), 11 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index ea63e0a..56d5b27 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -289,14 +289,9 @@ public final class LogicalTypeCasts {
final LogicalTypeRoot sourceRoot = sourceType.getTypeRoot();
final LogicalTypeRoot targetRoot = targetType.getTypeRoot();
- if (hasFamily(sourceType, INTERVAL) && hasFamily(targetType,
EXACT_NUMERIC)) {
- // cast between interval and exact numeric is only
supported if interval has a single field
- return isSingleFieldInterval(sourceType);
- } else if (hasFamily(sourceType, EXACT_NUMERIC) &&
hasFamily(targetType, INTERVAL)) {
- // cast between interval and exact numeric is only
supported if interval has a single field
- return isSingleFieldInterval(targetType);
- } else if (hasFamily(sourceType, CONSTRUCTED) ||
hasFamily(targetType, CONSTRUCTED)) {
- return supportsConstructedCasting(sourceType,
targetType, allowExplicit);
+ if (sourceRoot == NULL) {
+ // null can be cast to an arbitrary type
+ return true;
} else if (sourceRoot == DISTINCT_TYPE && targetRoot ==
DISTINCT_TYPE) {
// the two distinct types are not equal (from initial
invariant), casting is not possible
return false;
@@ -304,12 +299,17 @@ public final class LogicalTypeCasts {
return supportsCasting(((DistinctType)
sourceType).getSourceType(), targetType, allowExplicit);
} else if (targetRoot == DISTINCT_TYPE) {
return supportsCasting(sourceType, ((DistinctType)
targetType).getSourceType(), allowExplicit);
+ } else if (hasFamily(sourceType, INTERVAL) &&
hasFamily(targetType, EXACT_NUMERIC)) {
+ // cast between interval and exact numeric is only
supported if interval has a single field
+ return isSingleFieldInterval(sourceType);
+ } else if (hasFamily(sourceType, EXACT_NUMERIC) &&
hasFamily(targetType, INTERVAL)) {
+ // cast between interval and exact numeric is only
supported if interval has a single field
+ return isSingleFieldInterval(targetType);
+ } else if (hasFamily(sourceType, CONSTRUCTED) ||
hasFamily(targetType, CONSTRUCTED)) {
+ return supportsConstructedCasting(sourceType,
targetType, allowExplicit);
} else if (sourceRoot == STRUCTURED_TYPE || targetRoot ==
STRUCTURED_TYPE) {
// inheritance is not supported yet, so structured type
must be fully equal
return false;
- } else if (sourceRoot == NULL) {
- // null can be cast to an arbitrary type
- return true;
} else if (sourceRoot == RAW || targetRoot == RAW) {
// the two raw types are not equal (from initial
invariant), casting is not possible
return false;
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
index 03fb1b7..b817754 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
@@ -106,6 +106,18 @@ public class LogicalTypeCastsTest {
{new NullType(), new IntType(), true, true},
+ {
+ new NullType(),
+ new RowType(
+ Arrays.asList(
+ new RowField("f1", new
IntType()),
+ new RowField("f2", new
IntType())
+ )
+ ),
+ true,
+ true
+ },
+
{new ArrayType(new IntType()), new
ArrayType(new BigIntType()), true, true},
{new ArrayType(new IntType()), new
ArrayType(new VarCharType(Integer.MAX_VALUE)), false, true},
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index cfffc62..50f0d32 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -22,8 +22,10 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
@@ -38,6 +40,7 @@ import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.util.Static;
import java.math.BigDecimal;
+import java.util.List;
import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
@@ -85,4 +88,11 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
}
super.validateJoin(join, scope);
}
+
+ @Override
+ public void validateColumnListParams(SqlFunction function,
List<RelDataType> argTypes, List<SqlNode> operands) {
+ // we don't support column lists and translate them into the
unknown type in the type factory,
+ // this makes it possible to ignore them in the validator and
fall back to regular row types
+ // see also SqlFunction#deriveType
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index 57d64f8..acc09da 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -311,6 +311,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
// keep precision/scale in sync with our type system's default value,
// see DecimalType.USER_DEFAULT.
createSqlType(typeName, DecimalType.DEFAULT_PRECISION,
DecimalType.DEFAULT_SCALE)
+ } else if (typeName == COLUMN_LIST) {
+ // we don't support column lists and translate them into the unknown
type,
+ // this makes it possible to ignore them in the validator and fall back
to regular row types
+ // see also SqlFunction#deriveType
+ createUnknownType()
} else {
super.createSqlType(typeName)
}