This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit aea176e1fc15a9cf0388e8b32ce3bb3688e876bf Author: Timo Walther <[email protected]> AuthorDate: Wed Mar 9 15:38:52 2022 +0100 [FLINK-26518][table] Support BridgingSqlFunction with SqlTableFunction for Scala implicits This closes #19137. --- .../operations/utils/CalculatedTableFactory.java | 6 ++- .../table/api/ImplicitExpressionConversions.scala | 8 +-- .../calcite/sql/validate/ProcedureNamespace.java | 35 ++++++------- .../table/planner/calcite/FlinkRelBuilder.java | 59 ++++++++++++++++++++++ .../functions/bridging/BridgingSqlFunction.java | 39 +++++++++++++- .../planner/plan/QueryOperationConverter.java | 12 +++-- .../planner/plan/utils/SetOpRewriteUtil.scala | 12 +++-- .../planner/runtime/stream/sql/FunctionITCase.java | 33 +++++------- .../runtime/stream/table/FunctionITCase.java | 39 +++++++------- .../utils/JavaUserDefinedTableFunctions.java | 6 ++- .../planner/plan/batch/sql/SetOperatorsTest.xml | 8 +-- .../planner/plan/batch/table/CorrelateTest.xml | 28 +++++----- .../planner/plan/batch/table/SetOperatorsTest.xml | 8 +-- .../stringexpr/CorrelateStringExpressionTest.xml | 34 ++++++------- .../planner/plan/common/PartialInsertTest.xml | 16 +++--- .../rules/logical/RewriteIntersectAllRuleTest.xml | 12 ++--- .../plan/rules/logical/RewriteMinusAllRuleTest.xml | 12 ++--- .../planner/plan/stream/sql/SetOperatorsTest.xml | 8 +-- .../plan/stream/table/ColumnFunctionsTest.xml | 4 +- .../planner/plan/stream/table/CorrelateTest.xml | 51 +++++++++---------- .../stream/table/TemporalTableFunctionJoinTest.xml | 10 ++-- .../planner/plan/batch/sql/TableSourceTest.scala | 1 - .../planner/plan/batch/table/CorrelateTest.scala | 12 ----- .../stringexpr/CorrelateStringExpressionTest.scala | 4 -- .../table/validation/CorrelateValidationTest.scala | 1 - .../planner/plan/stream/table/CorrelateTest.scala | 17 ------- .../table/validation/CorrelateValidationTest.scala | 3 +- .../runtime/batch/table/CorrelateITCase.scala | 3 +- .../planner/runtime/utils/StreamingTestBase.scala | 3 ++ .../planner/utils/UserDefinedTableFunctions.scala | 24 +++------ 30 files changed, 274 insertions(+), 234 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java index 09b9eca..6d20abd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.table.expressions.ExpressionUtils; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor; import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.operations.CalculatedQueryOperation; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.types.DataType; @@ -38,6 +39,7 @@ import java.util.Collections; import java.util.List; import static java.util.stream.Collectors.toList; +import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunctionOfKind; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS; /** Utility class for creating a valid {@link CalculatedQueryOperation} operation. */ @@ -89,7 +91,7 @@ final class CalculatedTableFactory { + alias))) .collect(toList()); - if (!(children.get(0) instanceof CallExpression)) { + if (!isFunctionOfKind(children.get(0), FunctionKind.TABLE)) { throw fail(); } @@ -156,7 +158,7 @@ final class CalculatedTableFactory { private ValidationException fail() { return new ValidationException( - "A lateral join only accepts a string expression which defines a table function " + "A lateral join only accepts an expression which defines a table function " + "call that might be followed by some alias."); } } diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala index b2c0ca2..c36c3d1 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala @@ -158,17 +158,13 @@ trait ImplicitExpressionConversions { } } - implicit class TableFunctionCall[T: TypeInformation](val t: TableFunction[T]) { + implicit class TableFunctionCall(val t: TableFunction[_]) { /** * Calls a table function for the given parameters. */ def apply(params: Expression*): Expression = { - val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper - .getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]]) - unresolvedCall( - new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo), - params.map(ApiExpressionUtils.objectToExpression): _*) + unresolvedCall(t, params.map(ApiExpressionUtils.objectToExpression): _*) } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java index cf9beec..22e9380 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java @@ -21,13 +21,12 @@ import org.apache.flink.annotation.Internal; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlCallBinding; -import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlTableFunction; -import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.type.SqlReturnTypeInference; -import org.apache.calcite.sql.type.SqlTypeName; + +import static java.util.Objects.requireNonNull; /** * Namespace whose contents are defined by the result of a call to a user-defined procedure. @@ -56,25 +55,23 @@ public final class ProcedureNamespace extends AbstractNamespace { public RelDataType validateImpl(RelDataType targetRowType) { validator.inferUnknownTypes(validator.unknownType, scope, call); - final RelDataType type = validator.deriveTypeImpl(scope, call); + // The result is ignored but the type is derived to trigger the validation + validator.deriveTypeImpl(scope, call); final SqlOperator operator = call.getOperator(); final SqlCallBinding callBinding = new SqlCallBinding(validator, scope, call); - if (operator instanceof SqlTableFunction) { - final SqlTableFunction tableFunction = (SqlTableFunction) operator; - if (type.getSqlTypeName() != SqlTypeName.CURSOR) { - throw new IllegalArgumentException( - "Table function should have CURSOR " + "type, not " + type); - } - final SqlReturnTypeInference rowTypeInference = tableFunction.getRowTypeInference(); - RelDataType retType = rowTypeInference.inferReturnType(callBinding); - return validator.getTypeFactory().createTypeWithNullability(retType, false); - } - - // special handling of collection tables TABLE(function(...)) - if (SqlUtil.stripAs(enclosingNode).getKind() == SqlKind.COLLECTION_TABLE) { - return toStruct(type, getNode()); + if (!(operator instanceof SqlTableFunction)) { + throw new IllegalArgumentException( + "Argument must be a table function: " + operator.getNameAsId()); } - return type; + final SqlTableFunction tableFunction = (SqlTableFunction) operator; + final SqlReturnTypeInference rowTypeInference = tableFunction.getRowTypeInference(); + final RelDataType rowRelDataType = + requireNonNull( + rowTypeInference.inferReturnType(callBinding), + () -> "got null from inferReturnType for call " + callBinding.getCall()); + // For BridgingSqlFunction the type can still be atomic + // and will be wrapped with a proper field alias + return toStruct(rowRelDataType, getNode()); } /** Converts a type to a struct if it is not already. */ diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java index 35ab473..c5b774d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java @@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.planner.calcite.FlinkRelFactories.ExpandFactory; import org.apache.flink.table.planner.calcite.FlinkRelFactories.RankFactory; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; import org.apache.flink.table.planner.hint.FlinkHints; import org.apache.flink.table.planner.plan.QueryOperationConverter; import org.apache.flink.table.planner.plan.logical.LogicalWindow; @@ -33,6 +34,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggre import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty; import org.apache.flink.table.runtime.operators.rank.RankRange; import org.apache.flink.table.runtime.operators.rank.RankType; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; @@ -40,6 +42,7 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptTable.ToRelContext; import org.apache.calcite.plan.ViewExpanders; @@ -48,18 +51,26 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalTableFunctionScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RelBuilderFactory; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Util; import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate; @@ -105,6 +116,54 @@ public final class FlinkRelBuilder extends RelBuilder { }; } + /** + * {@link RelBuilder#functionScan(SqlOperator, int, Iterable)} cannot work smoothly with aliases + * which is why we implement a custom one. The method is static because some {@link RelOptRule}s + * don't use {@link FlinkRelBuilder}. + */ + public static RelBuilder pushFunctionScan( + RelBuilder relBuilder, + SqlOperator operator, + int inputCount, + Iterable<RexNode> operands, + List<String> aliases) { + Preconditions.checkArgument( + operator instanceof BridgingSqlFunction.WithTableFunction, + "Table function expected."); + final RexBuilder rexBuilder = relBuilder.getRexBuilder(); + final RelDataTypeFactory typeFactory = relBuilder.getTypeFactory(); + + final List<RelNode> inputs = new LinkedList<>(); + for (int i = 0; i < inputCount; i++) { + inputs.add(0, relBuilder.build()); + } + + final List<RexNode> operandList = CollectionUtil.iterableToList(operands); + + final RelDataType functionRelDataType = rexBuilder.deriveReturnType(operator, operandList); + final List<RelDataType> fieldRelDataTypes; + if (functionRelDataType.isStruct()) { + fieldRelDataTypes = + functionRelDataType.getFieldList().stream() + .map(RelDataTypeField::getType) + .collect(Collectors.toList()); + } else { + fieldRelDataTypes = Collections.singletonList(functionRelDataType); + } + final RelDataType rowRelDataType = typeFactory.createStructType(fieldRelDataTypes, aliases); + + final RexNode call = rexBuilder.makeCall(rowRelDataType, operator, operandList); + final RelNode functionScan = + LogicalTableFunctionScan.create( + relBuilder.getCluster(), + inputs, + call, + null, + rowRelDataType, + Collections.emptySet()); + return relBuilder.push(functionScan); + } + public RelBuilder expand(List<List<RexNode>> projects, int expandIdIndex) { final RelNode input = build(); final RelNode expand = expandFactory.createExpand(input, projects, expandIdIndex); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index 5dbe4d8..d661787 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -26,6 +26,7 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.utils.ShortcutUtils; import org.apache.flink.table.types.DataType; @@ -35,6 +36,10 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlTableFunction; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.tools.RelBuilder; import java.util.List; @@ -52,7 +57,7 @@ import static org.apache.flink.util.Preconditions.checkState; * (either a system or user-defined function). */ @Internal -public final class BridgingSqlFunction extends SqlFunction { +public class BridgingSqlFunction extends SqlFunction { private final DataTypeFactory dataTypeFactory; @@ -108,6 +113,10 @@ public final class BridgingSqlFunction extends SqlFunction { functionKind == FunctionKind.SCALAR || functionKind == FunctionKind.TABLE, "Scalar or table function kind expected."); + if (functionKind == FunctionKind.TABLE) { + return new BridgingSqlFunction.WithTableFunction( + dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference); + } return new BridgingSqlFunction( dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference); } @@ -177,4 +186,32 @@ public final class BridgingSqlFunction extends SqlFunction { public boolean isDeterministic() { return resolvedFunction.getDefinition().isDeterministic(); } + + // -------------------------------------------------------------------------------------------- + // Table function extension + // -------------------------------------------------------------------------------------------- + + /** Special flavor of {@link BridgingSqlFunction} to indicate a table function to Calcite. */ + public static class WithTableFunction extends BridgingSqlFunction implements SqlTableFunction { + + private WithTableFunction( + DataTypeFactory dataTypeFactory, + FlinkTypeFactory typeFactory, + SqlKind kind, + ContextResolvedFunction resolvedFunction, + TypeInference typeInference) { + super(dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference); + } + + /** + * The conversion to a row type is handled on the caller side. This allows us to perform it + * SQL/Table API-specific. This is in particular important to set the aliases of fields + * correctly (see {@link FlinkRelBuilder#pushFunctionScan(RelBuilder, SqlOperator, int, + * Iterable, List)}). + */ + @Override + public SqlReturnTypeInference getRowTypeInference() { + return getReturnTypeInference(); + } + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java index 837b3b2..3fcf62fa 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java @@ -305,10 +305,14 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod final BridgingSqlFunction sqlFunction = BridgingSqlFunction.of(relBuilder.getCluster(), resolvedFunction); - return relBuilder - .functionScan(sqlFunction, 0, parameters) - .rename(calculatedTable.getResolvedSchema().getColumnNames()) - .build(); + FlinkRelBuilder.pushFunctionScan( + relBuilder, + sqlFunction, + 0, + parameters, + calculatedTable.getResolvedSchema().getColumnNames()); + + return relBuilder.build(); } private RelNode convertLegacyTableFunction( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala index eb6d422..32e400f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.utils import org.apache.flink.table.functions.BuiltInFunctionDefinitions +import org.apache.flink.table.planner.calcite.FlinkRelBuilder import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction import org.apache.calcite.plan.RelOptUtil @@ -73,12 +74,15 @@ object SetOpRewriteUtil { val cluster = relBuilder.getCluster val sqlFunction = BridgingSqlFunction.of( - relBuilder.getCluster, + cluster, BuiltInFunctionDefinitions.INTERNAL_REPLICATE_ROWS) - relBuilder - .functionScan(sqlFunction, 0, relBuilder.fields(Util.range(fields.size() + 1))) - .rename(outputRelDataType.getFieldNames) + FlinkRelBuilder.pushFunctionScan( + relBuilder, + sqlFunction, + 0, + relBuilder.fields(Util.range(fields.size() + 1)), + outputRelDataType.getFieldNames) // correlated join val corSet = Collections.singleton(cluster.createCorrel()) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java index d39d180..ed19aba 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java @@ -64,6 +64,7 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -925,16 +926,12 @@ public class FunctionITCase extends StreamingTestBase { public void testInvalidUseOfSystemScalarFunction() { tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')"); - try { - tEnv().explainSql("INSERT INTO SinkTable " + "SELECT * FROM TABLE(MD5('3'))"); - fail(); - } catch (ValidationException e) { - assertThat( - e, - hasMessage( - containsString( - "Currently, only table functions can be used in a correlate operation."))); - } + assertThatThrownBy( + () -> + tEnv().explainSql( + "INSERT INTO SinkTable " + + "SELECT * FROM TABLE(MD5('3'))")) + .hasMessageContaining("Argument must be a table function: MD5"); } @Test @@ -946,16 +943,12 @@ public class FunctionITCase extends StreamingTestBase { tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class); - try { - tEnv().explainSql("INSERT INTO SinkTable " + "SELECT RowTableFunction('test')"); - fail(); - } catch (ValidationException e) { - assertThat( - e, - hasMessage( - containsString( - "Currently, only scalar functions can be used in a projection or filter operation."))); - } + assertThatThrownBy( + () -> + tEnv().explainSql( + "INSERT INTO SinkTable " + + "SELECT RowTableFunction('test')")) + .hasMessageContaining("Cannot call table function here: 'RowTableFunction'"); } @Test diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java index 6891d62..06af8ef 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java @@ -28,27 +28,23 @@ import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; import org.apache.flink.types.Row; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.List; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; -import static org.hamcrest.CoreMatchers.containsString; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; -import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; /** Tests for user defined functions in the Table API. */ public class FunctionITCase extends StreamingTestBase { - @Rule public ExpectedException thrown = ExpectedException.none(); - @Test - public void testScalarFunction() throws Exception { + void testScalarFunction() throws Exception { final List<Row> sourceData = Arrays.asList(Row.of(1, 1L, 1L), Row.of(2, 2L, 1L), Row.of(3, 3L, 1L)); @@ -61,7 +57,7 @@ public class FunctionITCase extends StreamingTestBase { tEnv().executeSql( "CREATE TABLE TestTable(a INT, b BIGINT, c BIGINT) WITH ('connector' = 'COLLECTION')"); - Table table = + final Table table = tEnv().from("TestTable") .select( $("a"), @@ -75,7 +71,7 @@ public class FunctionITCase extends StreamingTestBase { } @Test - public void testJoinWithTableFunction() throws Exception { + void testJoinWithTableFunction() throws Exception { final List<Row> sourceData = Arrays.asList( Row.of("1,2,3"), Row.of("2,3,4"), Row.of("3,4,5"), Row.of((String) null)); @@ -103,23 +99,22 @@ public class FunctionITCase extends StreamingTestBase { } @Test - public void testLateralJoinWithScalarFunction() throws Exception { - thrown.expect(ValidationException.class); - thrown.expect( - hasMessage( - containsString( - "Currently, only table functions can be used in a correlate operation."))); - + void testLateralJoinWithScalarFunction() throws Exception { TestCollectionTableFactory.reset(); tEnv().executeSql("CREATE TABLE SourceTable(s STRING) WITH ('connector' = 'COLLECTION')"); tEnv().executeSql( "CREATE TABLE SinkTable(s STRING, sa ARRAY<STRING>) WITH ('connector' = 'COLLECTION')"); - tEnv().from("SourceTable") - .joinLateral(call(new RowScalarFunction(), $("s")).as("a", "b")) - .select($("a"), $("b")) - .executeInsert("SinkTable") - .await(); + assertThatThrownBy( + () -> { + tEnv().from("SourceTable") + .joinLateral( + call(new RowScalarFunction(), $("s")).as("a", "b")); + }) + .satisfies( + anyCauseMatches( + ValidationException.class, + "A lateral join only accepts an expression which defines a table function")); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java index 95f6e51..9d7d559 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java @@ -21,6 +21,7 @@ package org.apache.flink.table.planner.runtime.utils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.functions.TableFunction; @@ -34,7 +35,10 @@ public class JavaUserDefinedTableFunctions { /** Emit inputs as long. */ public static class JavaTableFunc0 extends TableFunction<Long> { - public void eval(Integer a, Long b, TimestampData c) { + public void eval( + @DataTypeHint("DATE") Integer a, + Long b, + @DataTypeHint("TIMESTAMP(0)") TimestampData c) { collect(a.longValue()); collect(b); collect(c.getMillisecond()); diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml index d34b6e8..7b89148 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml @@ -57,8 +57,8 @@ LogicalIntersect(all=[true]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[f0 AS c]) -+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,f0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +Calc(select=[c0 AS c]) ++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,c0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER]) +- Calc(select=[IF((vcol_left_cnt > vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[((vcol_left_cnt >= 1) AND (vcol_right_cnt >= 1))]) +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_COUNT(count$0) AS vcol_left_cnt, Final_COUNT(count$1) AS vcol_right_cnt]) +- Exchange(distribution=[hash[c]]) @@ -180,8 +180,8 @@ LogicalMinus(all=[true]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[f0 AS c]) -+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,f0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +Calc(select=[c0 AS c]) ++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,c0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER]) +- Calc(select=[sum_vcol_marker, c], where=[(sum_vcol_marker > 0)]) +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_vcol_marker]) +- Exchange(distribution=[hash[c]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml index d6347f7..a2d90a0 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml @@ -26,13 +26,13 @@ LogicalProject(c=[$0], d=[$1]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalProject(a=[$0], b=[$1], c=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, d]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -42,12 +42,12 @@ Calc(select=[c, d]) <![CDATA[ LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) -+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;]) ++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER]) +PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(*org.apache.flink.table.planner.utils.MockPythonTableFunction*(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -62,13 +62,13 @@ LogicalProject(c=[$0], d=[$1]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalProject(a=[$0], b=[$1], c=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, d]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -79,13 +79,13 @@ Calc(select=[c, d]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -96,13 +96,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -114,13 +114,13 @@ LogicalFilter(condition=[>($1, _UTF-16LE'')]) +- LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s], where=[(s > '')]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -131,13 +131,13 @@ Calc(select=[c, s], where=[(s > '')]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml index 598ee0e..d31bda9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml @@ -33,8 +33,8 @@ Calc(select=[EXPR$0 AS a, b, EXPR$1 AS c]) +- HashAggregate(isMerge=[true], groupBy=[b], select=[b, Final_SUM(sum$0) AS EXPR$0, Final_COUNT(count$1) AS EXPR$1]) +- Exchange(distribution=[hash[b]]) +- LocalHashAggregate(groupBy=[b], select=[b, Partial_SUM(a) AS sum$0, Partial_COUNT(c) AS count$1]) - +- Calc(select=[f0 AS a, f1 AS b, f2 AS c]) - +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,f0,f1,f2], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)], joinType=[INNER]) + +- Calc(select=[a0 AS a, b0 AS b, c0 AS c]) + +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,a0,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER a0, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER]) +- Calc(select=[sum_vcol_marker, a, b, c], where=[(sum_vcol_marker > 0)]) +- HashAggregate(isMerge=[true], groupBy=[a, b, c], select=[a, b, c, Final_SUM(sum$0) AS sum_vcol_marker]) +- Exchange(distribution=[hash[a, b, c]]) @@ -123,8 +123,8 @@ LogicalProject(b=[$1], c=[$2]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[f0 AS b, f1 AS c]) -+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,b,c))], select=[sum_vcol_marker,b,c,f0,f1], rowType=[RecordType(BIGINT sum_vcol_marker, BIGINT b, VARCHAR(2147483647) c, BIGINT f0, VARCHAR(2147483647) f1)], joinType=[INNER]) +Calc(select=[b0 AS b, c0 AS c]) ++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,b,c))], select=[sum_vcol_marker,b,c,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, BIGINT b, VARCHAR(2147483647) c, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER]) +- Calc(select=[sum_vcol_marker, b, c], where=[(sum_vcol_marker > 0)]) +- HashAggregate(isMerge=[true], groupBy=[b, c], select=[b, c, Final_SUM(sum$0) AS sum_vcol_marker]) +- Exchange(distribution=[hash[b, c]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml index 09262da..e698687 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml @@ -22,13 +22,13 @@ limitations under the License. LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -39,13 +39,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -56,13 +56,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -73,13 +73,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], name=[$3], len=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, name, len]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -90,13 +90,13 @@ Calc(select=[c, name, len]) LogicalProject(c=[$2], name=[$3], len=[$5], adult=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, name, len, adult]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(*org.apache.flink.table.planner.utils.HierarchyTableFunction*(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -104,16 +104,16 @@ Calc(select=[c, name, len, adult]) <TestCase name="testCorrelateJoins6"> <Resource name="ast"> <![CDATA[ -LogicalProject(c=[$2], name=[$4], age=[$3]) +LogicalProject(c=[$2], name=[$3], age=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, name, age]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(*org.apache.flink.table.planner.utils.PojoTableFunc*(c))], select=[a,b,c,name,age], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -125,13 +125,13 @@ LogicalFilter(condition=[>($2, 2)]) +- LogicalProject(c=[$2], name=[$3], len=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, name, len]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -142,13 +142,13 @@ Calc(select=[c, name, len]) LogicalProject(a=[$0], c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml index 067ed8e..1ede5ab 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml @@ -127,8 +127,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a <Resource name="optimized rel plan"> <![CDATA[ Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g]) -+- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g]) - +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER]) ++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g]) + +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER]) +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], where=[>(sum_vcol_marker, 0)]) +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, SUM_RETRACT(vcol_marker) AS sum_vcol_marker]) +- Exchange(distribution=[hash[a, c, d, e, f, g]]) @@ -165,8 +165,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a <![CDATA[ Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g]) +- Sort(orderBy=[c ASC, d ASC]) - +- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g]) - +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER]) + +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g]) + +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER]) +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], where=[>(sum_vcol_marker, 0)]) +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, Final_SUM(sum$0) AS sum_vcol_marker]) +- Exchange(distribution=[hash[a, c, d, e, f, g]]) @@ -205,8 +205,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a <Resource name="optimized rel plan"> <![CDATA[ Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g]) -+- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g]) - +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER]) ++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g]) + +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER]) +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))]) +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, COUNT_RETRACT(vcol_left_marker) AS vcol_left_cnt, COUNT_RETRACT(vcol_right_marker) AS vcol_right_cnt]) +- Exchange(distribution=[hash[a, c, d, e, f, g]]) @@ -243,8 +243,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a <![CDATA[ Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g]) +- Sort(orderBy=[c ASC, d ASC]) - +- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g]) - +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER]) + +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g]) + +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER]) +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))]) +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, Final_COUNT(count$0) AS vcol_left_cnt, Final_COUNT(count$1) AS vcol_right_cnt]) +- Exchange(distribution=[hash[a, c, d, e, f, g]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml index c5a3d2f..ad832c4 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml @@ -43,8 +43,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true]) : +- LogicalProject(f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalProject(c=[$0]) - +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) + +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)]) ]]> </Resource> </TestCase> @@ -77,8 +76,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true]) : +- LogicalProject(f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalProject(c=[$0]) - +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) + +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)]) ]]> </Resource> </TestCase> @@ -113,8 +111,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true]) : +- LogicalProject(d=[$0], e=[$1], f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalProject(a=[$0], b=[$1], c=[$2]) - +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType:peek_no_expand(INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)]) + +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]) ]]> </Resource> </TestCase> @@ -147,8 +144,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$2]) : +- LogicalFilter(condition=[=(1, 0)]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalProject(c=[$0]) - +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) + +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml index ae6d9dc..e895649 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml @@ -43,8 +43,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT]) : +- LogicalProject(f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalProject(c=[$0]) - +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) + +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)]) ]]> </Resource> </TestCase> @@ -77,8 +76,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT]) : +- LogicalProject(f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalProject(c=[$0]) - +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) + +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)]) ]]> </Resource> </TestCase> @@ -111,8 +109,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(f=[$2]) : +- LogicalFilter(condition=[=(1, 0)]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalProject(c=[$0]) - +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) + +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)]) ]]> </Resource> </TestCase> @@ -147,8 +144,7 @@ LogicalProject(c=[$2]) : +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_marker=[-1:BIGINT]) : +- LogicalProject(d=[$0], e=[$1], f=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]]) - +- LogicalProject(a=[$0], b=[$1], c=[$2]) - +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType:peek_no_expand(INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)]) + +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml index 59d848b..61552b1 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml @@ -58,8 +58,8 @@ LogicalIntersect(all=[true]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[f0 AS c]) -+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,f0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +Calc(select=[c0 AS c]) ++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,c0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER]) +- Calc(select=[IF((vcol_left_cnt > vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[((vcol_left_cnt >= 1) AND (vcol_right_cnt >= 1))]) +- GroupAggregate(groupBy=[c], select=[c, COUNT(vcol_left_marker) AS vcol_left_cnt, COUNT(vcol_right_marker) AS vcol_right_cnt]) +- Exchange(distribution=[hash[c]]) @@ -182,8 +182,8 @@ LogicalMinus(all=[true]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[f0 AS c]) -+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,f0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +Calc(select=[c0 AS c]) ++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,c0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER]) +- Calc(select=[sum_vcol_marker, c], where=[(sum_vcol_marker > 0)]) +- GroupAggregate(groupBy=[c], select=[c, SUM(vcol_marker) AS sum_vcol_marker]) +- Exchange(distribution=[hash[c]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml index 1c8dab3..063fcca 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml @@ -153,12 +153,12 @@ Join(joinType=[InnerJoin], where=[(int1 = int2)], select=[int1, long1, string1, <![CDATA[ LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]]) -+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)], elementType=[class [Ljava.lang.Object;]) ++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER]) +Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]], fields=[int, long, string]) ]]> </Resource> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml index cbee6a2..2542e8d 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml @@ -21,12 +21,12 @@ limitations under the License. <![CDATA[ LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) -+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;]) ++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER]) +PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(*org.apache.flink.table.planner.utils.MockPythonTableFunction*(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -63,13 +63,13 @@ LogicalProject(c=[$0], d=[$1]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalProject(a=[$0], b=[$1], c=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, d]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -84,13 +84,13 @@ LogicalProject(c=[$0], d=[$1]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalProject(a=[$0], b=[$1], c=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, d]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -101,13 +101,13 @@ Calc(select=[c, d]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -118,13 +118,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -135,13 +135,13 @@ Calc(select=[c, s]) LogicalProject(c=[$2], name=[$3], len=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, name, len]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], correlate=[table(TableFunc2(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -153,13 +153,13 @@ LogicalFilter(condition=[>($2, 2)]) +- LogicalProject(c=[$2], name=[$3], len=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, name, len]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -170,14 +170,13 @@ Calc(select=[c, name, len]) LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]]) - +- LogicalProject(f0=[$0], f1_0=[$1]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1)]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) f0, INTEGER f1_0)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[f0, f10 AS f1]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(f3))], select=[f1,f2,f3,f0,f10], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f10)], joinType=[INNER]) +Calc(select=[f0, f1_0 AS f1]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(f3))], select=[f1,f2,f3,f0,f1_0], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f1_0)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]], fields=[f1, f2, f3]) ]]> </Resource> @@ -209,12 +208,12 @@ Correlate(invocation=[str_split(_UTF-16LE'Jack,John')], correlate=[table(str_spl <![CDATA[ LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) -+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;]) ++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER]) +Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(*org.apache.flink.table.planner.utils.HierarchyTableFunction*(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -291,13 +290,13 @@ Correlate(invocation=[str_split(_UTF-16LE'Jack,John', _UTF-16LE',')], correlate= LogicalProject(c=[$2], s=[$3]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, s]) -+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) ++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -307,12 +306,12 @@ Calc(select=[c, s]) <![CDATA[ LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) -+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;]) ++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) +Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -322,12 +321,12 @@ Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRIN <![CDATA[ LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) -+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;]) ++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER]) +Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(*org.apache.flink.table.planner.utils.PojoTableFunc*(c))], select=[a,b,c,name,age], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml index 0525902..47f118e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml @@ -25,7 +25,7 @@ LogicalJoin(condition=[=($3, $1)], joinType=[inner]) : +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) : :- LogicalProject(o_rowtime=[AS($0, _UTF-16LE'o_rowtime')], o_comment=[AS($1, _UTF-16LE'o_comment')], o_amount=[AS($2, _UTF-16LE'o_amount')], o_currency=[AS($3, _UTF-16LE'o_currency')], o_secondary_key=[AS($4, _UTF-16LE'o_secondary_key')]) : : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) -: +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)], elementType=[class [Ljava.lang.Object;]) +: +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)]) +- LogicalTableScan(table=[[default_catalog, default_database, ThirdTable]]) ]]> </Resource> @@ -53,7 +53,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')]) +- LogicalFilter(condition=[=($3, $1)]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)]) ]]> </Resource> <Resource name="optimized exec plan"> @@ -74,7 +74,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')]) +- LogicalFilter(condition=[=($3, $1)]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) - +- LogicalTableFunctionScan(invocation=[Rates($2)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)]) + +- LogicalTableFunctionScan(invocation=[Rates($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)]) ]]> </Resource> <Resource name="optimized exec plan"> @@ -95,7 +95,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')]) +- LogicalFilter(condition=[=($3, $1)]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, ProctimeOrders]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$170cc46c47df69784f267e43f61e8e9d*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP_LTZ(3) *PROCTIME* proctime)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$170cc46c47df69784f267e43f61e8e9d*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP_LTZ(3) *PROCTIME* proctime)]) ]]> </Resource> <Resource name="optimized exec plan"> @@ -117,7 +117,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')]) +- LogicalFilter(condition=[=($3, $1)]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) - +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)], elementType=[class [Ljava.lang.Object;]) + +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)]) ]]> </Resource> <Resource name="optimized exec plan"> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala index ca28de3..cddc89d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.planner.plan.batch.sql -import org.apache.flink.table.api.config.TableConfigOptions import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder import org.apache.flink.table.planner.utils._ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala index 14348ae..8ebaee6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala @@ -34,8 +34,6 @@ class CorrelateTest extends TableTestBase { val util = batchTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val func = new TableFunc1 - util.addFunction("func1", func) - val result1 = table.joinLateral(func('c) as 's).select('c, 's) util.verifyExecPlan(result1) @@ -46,8 +44,6 @@ class CorrelateTest extends TableTestBase { val util = batchTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val func = new TableFunc1 - util.addFunction("func1", func) - val result2 = table.joinLateral(func('c, "$") as 's).select('c, 's) util.verifyExecPlan(result2) } @@ -57,8 +53,6 @@ class CorrelateTest extends TableTestBase { val util = batchTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val func = new TableFunc1 - util.addFunction("func1", func) - val result = table.leftOuterJoinLateral(func('c) as 's).select('c, 's).where('s > "") util.verifyExecPlan(result) } @@ -68,8 +62,6 @@ class CorrelateTest extends TableTestBase { val util = batchTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val func = new TableFunc1 - util.addFunction("func1", func) - val result = table.leftOuterJoinLateral(func('c) as 's, true).select('c, 's) util.verifyExecPlan(result) } @@ -79,8 +71,6 @@ class CorrelateTest extends TableTestBase { val util = batchTestUtil() val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val func = new TableFunc0 - util.addFunction("func1", func) - val result = sourceTable.select('a, 'b, 'c) .joinLateral(func('c) as('d, 'e)) .select('c, 'd, 'e) @@ -106,8 +96,6 @@ class CorrelateTest extends TableTestBase { val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val func = new TableFunc0 - util.addFunction("func1", func) - val result = sourceTable.select('a, 'b, 'c) .joinLateral(func('c) as('d, 'e)) .select('c, 'd, 'e) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala index 95b3405..a4aa185 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala @@ -29,9 +29,7 @@ class CorrelateStringExpressionTest extends TableTestBase { private val util = batchTestUtil() private val tab = util.addTableSource[(Int, Long, String)]("Table1", 'a, 'b, 'c) private val func1 = new TableFunc1 - util.addFunction("func1", func1) private val func2 = new TableFunc2 - util.addFunction("func2", func2) @Test def testCorrelateJoins1(): Unit = { @@ -61,7 +59,6 @@ class CorrelateStringExpressionTest extends TableTestBase { def testCorrelateJoins5(): Unit = { // test hierarchy generic type val hierarchy = new HierarchyTableFunction - util.addFunction("hierarchy", hierarchy) val scalaTable = tab.joinLateral( hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult) util.verifyExecPlan(scalaTable) @@ -71,7 +68,6 @@ class CorrelateStringExpressionTest extends TableTestBase { def testCorrelateJoins6(): Unit = { // test pojo type val pojo = new PojoTableFunc - util.addFunction("pojo", pojo) val scalaTable = tab.joinLateral(pojo('c)).select('c, 'name, 'age) util.verifyExecPlan(scalaTable) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala index d69383e..611928e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala @@ -36,7 +36,6 @@ class CorrelateValidationTest extends TableTestBase { val util = batchTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val func = new TableFunc1 - util.addFunction("func1", func) val result = table .leftOuterJoinLateral(func('c) as 's, 'c === 's) .select('c, 's) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala index 339e7ca..ff1be68 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala @@ -36,8 +36,6 @@ class CorrelateTest extends TableTestBase { val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new TableFunc1 - util.addFunction("func1", function) - val result1 = table.joinLateral(function('c) as 's).select('c, 's) util.verifyExecPlan(result1) } @@ -48,7 +46,6 @@ class CorrelateTest extends TableTestBase { val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new TableFunc1 - util.addFunction("func1", function) // test overloading val result2 = table.joinLateral(function('c, "$") as 's).select('c, 's) util.verifyExecPlan(result2) @@ -59,8 +56,6 @@ class CorrelateTest extends TableTestBase { val util = streamTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new TableFunc1 - util.addFunction("func1", function) - val result = table.leftOuterJoinLateral(function('c) as 's, true).select('c, 's) util.verifyExecPlan(result) } @@ -70,7 +65,6 @@ class CorrelateTest extends TableTestBase { val util = streamTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new TableFunc2 - util.addFunction("func2", function) val scalarFunc = new Func13("pre") val result = table.joinLateral( @@ -84,8 +78,6 @@ class CorrelateTest extends TableTestBase { val util = streamTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new HierarchyTableFunction - util.addFunction("hierarchy", function) - val result = table.joinLateral(function('c) as ('name, 'adult, 'len)) util.verifyExecPlan(result) } @@ -95,8 +87,6 @@ class CorrelateTest extends TableTestBase { val util = streamTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new PojoTableFunc - util.addFunction("pojo", function) - val result = table.joinLateral(function('c)) util.verifyExecPlan(result) } @@ -106,8 +96,6 @@ class CorrelateTest extends TableTestBase { val util = streamTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new TableFunc2 - util.addFunction("func2", function) - val result = table .joinLateral(function('c) as ('name, 'len)) .select('c, 'name, 'len) @@ -120,8 +108,6 @@ class CorrelateTest extends TableTestBase { val util = streamTestUtil() val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new TableFunc1 - util.addFunction("func1", function) - val result = table.joinLateral(function('c.substring(2)) as 's) util.verifyExecPlan(result) } @@ -131,8 +117,6 @@ class CorrelateTest extends TableTestBase { val util = streamTestUtil() val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new TableFunc0 - util.addFunction("func1", function) - val result = sourceTable.select('a, 'b, 'c) .joinLateral(function('c) as('d, 'e)) .select('c, 'd, 'e) @@ -158,7 +142,6 @@ class CorrelateTest extends TableTestBase { val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) val function = new TableFunc0 - util.addFunction("func1", function) val result = sourceTable.select('a, 'b, 'c) .joinLateral(function('c) as('d, 'e)) .select('c, 'd, 'e) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala index ec2401f..a991cc5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala @@ -68,11 +68,10 @@ class CorrelateValidationTest extends TableTestBase { util.addFunction("func0", Func0) // SQL API call - // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug expectExceptionThrown( util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"), null, - classOf[AssertionError]) + classOf[ValidationException]) //========== throw exception when the parameters is not correct =============== // Java Table API call diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala index 17a35c2..a125f93 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala @@ -222,7 +222,8 @@ class CorrelateITCase extends BatchTestBase { 'a.cast(DataTypes.TINYINT) as 'a, 'a.cast(DataTypes.SMALLINT) as 'b, 'b.cast(DataTypes.FLOAT) as 'c) - .joinLateral(tFunc('a, 'b, 'c) as ('a2, 'b2, 'c2)) + .joinLateral( + tFunc('a.ifNull(0.toByte), 'b.ifNull(0.toShort), 'c.ifNull(0.toFloat)) as ('a2, 'b2, 'c2)) val results = executeQuery(result) val expected = Seq( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala index 9f106c4..683543b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala @@ -25,6 +25,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.Row +import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.rules.{ExpectedException, TemporaryFolder} import org.junit.{After, Before, Rule} @@ -44,6 +45,7 @@ class StreamingTestBase extends AbstractTestBase { def tempFolder: TemporaryFolder = _tempFolder @Before + @BeforeEach def before(): Unit = { this.env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(4) @@ -55,6 +57,7 @@ class StreamingTestBase extends AbstractTestBase { } @After + @AfterEach def after(): Unit = { StreamTestSink.clear() TestValuesTableFactory.clearAllData() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala index f087d5a..6fcad3d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala @@ -29,8 +29,6 @@ import org.apache.flink.types.Row import org.junit.Assert -import java.lang.Boolean - import scala.annotation.varargs @@ -117,9 +115,10 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[ } @SerialVersionUID(1L) +@DataTypeHint("ROW<x INT, y INT>") class MockPythonTableFunction extends TableFunction[Row] with PythonFunction { - def eval(x: Int, y: Int) = ??? + def eval(x: java.lang.Integer, y: java.lang.Integer) = ??? override def getResultType: TypeInformation[Row] = new RowTypeInfo(Types.INT, Types.INT) @@ -368,28 +367,19 @@ class MockPythonTableFunction extends TableFunction[Row] with PythonFunction { //} @SerialVersionUID(1L) +@DataTypeHint("ROW<f0 STRING, f1 STRING, f2 STRING>") class TableFunc4 extends TableFunction[Row] { def eval(b: Byte, s: Short, f: Float): Unit = { collect(Row.of("Byte=" + b, "Short=" + s, "Float=" + f)) } - - override def getResultType: TypeInformation[Row] = { - new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING) - } } @SerialVersionUID(1L) +@DataTypeHint("ROW<a INT, b INT, c INT>") class TableFunc6 extends TableFunction[Row] { - def eval(row: Row): Unit = { + def eval(@DataTypeHint("ROW<a INT, b INT, c INT>") row: Row): Unit = { collect(row) } - - override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = - Array(new RowTypeInfo(Types.INT, Types.INT, Types.INT)) - - override def getResultType: TypeInformation[Row] = { - new RowTypeInfo(Types.INT, Types.INT, Types.INT) - } } @SerialVersionUID(1L) @@ -421,12 +411,12 @@ class VarArgsFunc0 extends TableFunction[String] { } @SerialVersionUID(1L) -class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] { +class HierarchyTableFunction extends SplittableTableFunction[java.lang.Boolean, Integer] { def eval(user: String) { if (user.contains("#")) { val splits = user.split("#") val age = splits(1).toInt - collect(new Tuple3[String, Boolean, Integer](splits(0), age >= 20, age)) + collect(new Tuple3[String, java.lang.Boolean, Integer](splits(0), age >= 20, age)) } } }
