This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fabff4f38a56b29400b644d370e99cb8a398e8bb Author: yuzhao.cyz <[email protected]> AuthorDate: Tue Mar 17 21:35:48 2020 +0800 [FLINK-14338][table-planner][table-planner-blink] Update all kinds of left plan changes * Some join order changes for blink-planner due to the rule fire sequence changes, see https://github.com/apache/calcite/commit/35caa059a762094c7df0b30e9b51358a19b48ac2, they are still correct * The Correlate row count estimation has been fixed from a always 1 to join like estimation, thus, if the inputs of Join is a Correlate, the join algorithm would very probably changes, i.e. batch.sql.SubplanReuseTest * Due to CALCITE-3729, the filter condition was pushed down for some Join cases: batch.sql.join.JoinReorderTest * Due to CALCITE-2450 RexNode normalization, the predicates sequence of some test changes: logical.subquery.FlinkRewriteSubQueryRuleTest * The Decimal modulus precision inference has been fixed: planner.expressions.DecimalTypeTest --- .../src/main/codegen/data/Parser.tdd | 1 + flink-table/flink-table-planner-blink/pom.xml | 5 + .../src/main/resources/META-INF/NOTICE | 3 + .../table/planner/calcite/FlinkRelFactories.scala | 16 ++- .../WindowAggregateReduceFunctionsRule.scala | 18 +-- .../batch/BatchExecWindowAggregateRule.scala | 20 +--- .../planner/plan/batch/sql/SubplanReuseTest.xml | 41 +++---- .../plan/batch/sql/agg/HashAggregateTest.xml | 6 +- .../plan/batch/sql/agg/SortAggregateTest.xml | 6 +- .../sql/join/BroadcastHashSemiAntiJoinTest.xml | 40 +++---- .../plan/batch/sql/join/JoinReorderTest.xml | 85 +++++++------- .../batch/sql/join/NestedLoopSemiAntiJoinTest.xml | 82 +++++++------ .../plan/batch/sql/join/SemiAntiJoinTest.xml | 126 ++++++++++---------- .../sql/join/ShuffledHashSemiAntiJoinTest.xml | 130 +++++++++++++++++---- .../batch/sql/join/SortMergeSemiAntiJoinTest.xml | 45 ++++--- .../JoinDependentConditionDerivationRuleTest.xml | 14 +-- .../subquery/FlinkRewriteSubQueryRuleTest.xml | 2 +- .../logical/subquery/SubQueryAntiJoinTest.xml | 24 ++-- .../planner/plan/stream/sql/agg/AggregateTest.xml | 2 +- .../plan/stream/sql/agg/TwoStageAggregateTest.xml | 2 +- .../plan/stream/sql/join/SemiAntiJoinTest.xml | 86 +++++++------- .../planner/plan/stream/table/AggregateTest.xml | 4 +- .../plan/stream/table/TwoStageAggregateTest.xml | 4 +- .../planner/expressions/DecimalTypeTest.scala | 6 +- .../sql/join/ShuffledHashSemiAntiJoinTest.scala | 21 ---- .../planner/plan/utils/FlinkRexUtilTest.scala | 6 +- .../catalog/QueryOperationCatalogViewTable.java | 30 ++++- .../table/operations/PlannerQueryOperation.java | 20 +++- .../table/api/batch/sql/GroupWindowTest.scala | 6 +- .../table/api/batch/sql/SetOperatorsTest.scala | 12 +- .../flink/table/api/batch/table/CalcTest.scala | 2 +- .../table/api/stream/sql/GroupWindowTest.scala | 10 +- .../flink/table/api/stream/sql/JoinTest.scala | 2 +- .../api/stream/sql/TemporalTableJoinTest.scala | 17 +-- .../api/stream/table/TemporalTableJoinTest.scala | 17 +-- .../flink/table/plan/RexProgramExtractorTest.scala | 4 +- .../resources/testSqlUpdateAndToDataStream.out | 2 +- 37 files changed, 496 insertions(+), 421 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 14158c7..f11f5d9 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -68,6 +68,7 @@ # List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved # keyword, please also add it to 'nonReservedKeywords' section. + # Please keep the keyword in alphabetical order if new keyword is added. keywords: [ "BYTES" "CATALOGS" diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml index b61f7d1..0bb7aff 100644 --- a/flink-table/flink-table-planner-blink/pom.xml +++ b/flink-table/flink-table-planner-blink/pom.xml @@ -320,6 +320,7 @@ under the License. <include>com.fasterxml.jackson.core:jackson-databind</include> <include>com.fasterxml.jackson.core:jackson-annotations</include> <include>commons-codec:commons-codec</include> + <include>commons-io:commons-io</include> <!-- flink-table-planner-blink dependencies --> <include>org.apache.flink:flink-sql-parser</include> @@ -352,6 +353,10 @@ under the License. <pattern>org.apache.commons.codec</pattern> <shadedPattern>org.apache.flink.calcite.shaded.org.apache.commons.codec</shadedPattern> </relocation> + <relocation> + <pattern>org.apache.commons.io</pattern> + <shadedPattern>org.apache.flink.calcite.shaded.org.apache.commons.io</shadedPattern> + </relocation> <!-- flink-table-planner dependencies --> <!-- not relocated for now, because we need to change the contents of the properties field otherwise --> diff --git a/flink-table/flink-table-planner-blink/src/main/resources/META-INF/NOTICE b/flink-table/flink-table-planner-blink/src/main/resources/META-INF/NOTICE index 381d6d5..e1246a4 100644 --- a/flink-table/flink-table-planner-blink/src/main/resources/META-INF/NOTICE +++ b/flink-table/flink-table-planner-blink/src/main/resources/META-INF/NOTICE @@ -12,6 +12,9 @@ This project bundles the following dependencies under the Apache Software Licens - com.fasterxml.jackson.core:jackson-databind:2.10.1 - com.jayway.jsonpath:json-path:2.4.0 - joda-time:joda-time:2.5 +- net.minidev:json-smart:jar:2.3 +- net.minidev:accessors-smart:jar:1.2 +- org.ow2.asm:asm:jar:5.0.4 - org.apache.calcite:calcite-core:1.22.0 - org.apache.calcite:calcite-linq4j:1.22.0 - org.apache.calcite.avatica:avatica-core:1.16.0 diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala index 3af65df..1f002b2 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala @@ -24,9 +24,10 @@ import org.apache.flink.table.sinks.TableSink import org.apache.calcite.plan.Contexts import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.core.RelFactories import org.apache.calcite.rel.{RelCollation, RelNode} import org.apache.calcite.rex.RexNode -import org.apache.calcite.tools.RelBuilderFactory +import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory} import org.apache.calcite.util.ImmutableBitSet import java.util @@ -38,6 +39,19 @@ object FlinkRelFactories { val FLINK_REL_BUILDER: RelBuilderFactory = FlinkRelBuilder.proto(Contexts.empty) + // Because of: + // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input, + // if the input is a Project. + // + // the field can not be pruned if it is referenced by other expressions + // of the window aggregation(i.e. the TUMBLE_START/END). + // To solve this, we config the RelBuilder to forbidden this feature. + val LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE: RelBuilderFactory = RelBuilder.proto( + Contexts.of( + RelFactories.DEFAULT_STRUCT, + RelBuilder.Config.DEFAULT + .withPruneInputOfAggregate(false))) + val DEFAULT_EXPAND_FACTORY = new ExpandFactoryImpl val DEFAULT_RANK_FACTORY = new RankFactoryImpl diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala index f6d7f9e..0af6c63 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala @@ -18,12 +18,12 @@ package org.apache.flink.table.planner.plan.rules.logical +import org.apache.flink.table.planner.calcite.FlinkRelFactories import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate -import org.apache.calcite.plan.Contexts import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories} +import org.apache.calcite.rel.core.{Aggregate, AggregateCall} import org.apache.calcite.rel.logical.LogicalAggregate import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule import org.apache.calcite.rex.RexNode @@ -40,11 +40,7 @@ import scala.collection.JavaConversions._ class WindowAggregateReduceFunctionsRule extends AggregateReduceFunctionsRule( operand(classOf[LogicalWindowAggregate], any()), - RelBuilder.proto( - Contexts.of( - RelFactories.DEFAULT_STRUCT, - RelBuilder.Config.DEFAULT - .withPruneInputOfAggregate(false)))) { + FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE) { override def newAggregateRel( relBuilder: RelBuilder, @@ -52,14 +48,6 @@ class WindowAggregateReduceFunctionsRule newCalls: util.List[AggregateCall]): Unit = { // create a LogicalAggregate with simpler aggregation functions - - // Because of: - // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input, - // if the input is a Project. - // - // the field can not be pruned if it is referenced by other expressions - // of the window aggregation(i.e. the TUMBLE_START/END). - // To solve this, we config the RelBuilder to forbidden this feature. super.newAggregateRel(relBuilder, oldAgg, newCalls) // pop LogicalAggregate from RelBuilder val newAgg = relBuilder.build().asInstanceOf[LogicalAggregate] diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala index bf430cb..847f29b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction} -import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory} +import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelFactories, FlinkTypeFactory} import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow} @@ -34,13 +34,12 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDat import org.apache.flink.table.types.logical.{BigIntType, IntType, LogicalType} import org.apache.calcite.plan.RelOptRule._ -import org.apache.calcite.plan.{Contexts, RelOptRule, RelOptRuleCall} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Aggregate.Group -import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories} +import org.apache.calcite.rel.core.{Aggregate, AggregateCall} import org.apache.calcite.rel.{RelCollations, RelNode} import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.tools.RelBuilder import org.apache.commons.math3.util.ArithmeticUtils import scala.collection.JavaConversions._ @@ -71,11 +70,7 @@ class BatchExecWindowAggregateRule extends RelOptRule( operand(classOf[FlinkLogicalWindowAggregate], operand(classOf[RelNode], any)), - RelBuilder.proto( - Contexts.of( - RelFactories.DEFAULT_STRUCT, - RelBuilder.Config.DEFAULT - .withPruneInputOfAggregate(false))), + FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE, "BatchExecWindowAggregateRule") with BatchExecAggRuleBase { @@ -163,13 +158,6 @@ class BatchExecWindowAggregateRule // TODO aggregate include projection now, so do not provide new trait will be safe val aggProvidedTraitSet = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) - // Because of: - // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input, - // if the input is a Project. - // - // the field can not be pruned if it is referenced by other expressions - // of the window aggregation(i.e. the TUMBLE_START/END). - // To solve this, we config the RelBuilder to forbidden this feature. val inputTimeFieldIndex = AggregateUtil.timeFieldIndex( input.getRowType, call.builder(), window.timeAttribute) val inputTimeFieldType = agg.getInput.getRowType.getFieldList.get(inputTimeFieldIndex).getType diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml index d36ed2b..157aadbb 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml @@ -426,10 +426,11 @@ LogicalProject(a=[$0], b=[$1], c=[$2], v=[$3], a0=[$4], b0=[$5], c0=[$6], v0=[$7 </Resource> <Resource name="planAfter"> <![CDATA[ -HashJoin(joinType=[InnerJoin], where=[=(f0, f00)], select=[a, b, c, f0, a0, b0, c0, f00], isBroadcast=[true], build=[right]) -:- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], correlate=[table(str_split($cor0.c,_UTF-16LE'-'))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) -: +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -+- Exchange(distribution=[broadcast]) +HashJoin(joinType=[InnerJoin], where=[=(f0, f00)], select=[a, b, c, f0, a0, b0, c0, f00], build=[right]) +:- Exchange(distribution=[hash[f0]]) +: +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], correlate=[table(str_split($cor0.c,_UTF-16LE'-'))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +: +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- Exchange(distribution=[hash[f0]]) +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], correlate=[table(str_split($cor1.c,_UTF-16LE'-'))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -459,12 +460,13 @@ LogicalProject(a=[$0], b=[$1], c=[$2], s=[$3], a0=[$4], b0=[$5], c0=[$6], s0=[$7 </Resource> <Resource name="planAfter"> <![CDATA[ -NestedLoopJoin(joinType=[InnerJoin], where=[=(c, f00)], select=[a, b, c, f0, a0, b0, c0, f00], build=[left]) -:- Exchange(distribution=[broadcast]) -: +- Correlate(invocation=[TableFun($cor0.c)], correlate=[table(TableFun($cor0.c))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +HashJoin(joinType=[InnerJoin], where=[=(c, f00)], select=[a, b, c, f0, a0, b0, c0, f00], build=[right]) +:- Correlate(invocation=[TableFun($cor0.c)], correlate=[table(TableFun($cor0.c))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +: +- Exchange(distribution=[hash[c]]) : +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -+- Correlate(invocation=[TableFun($cor1.c)], correlate=[table(TableFun($cor1.c))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) - +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- Exchange(distribution=[hash[f0]]) + +- Correlate(invocation=[TableFun($cor1.c)], correlate=[table(TableFun($cor1.c))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> @@ -1093,17 +1095,16 @@ LogicalIntersect(all=[false]) NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right]) :- SortAggregate(isMerge=[false], groupBy=[random], select=[random]) : +- Sort(orderBy=[random ASC]) -: +- Exchange(distribution=[hash[random]]) -: +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right]) -: :- Exchange(distribution=[any], shuffle_mode=[BATCH]) -: : +- Calc(select=[random], reuse_id=[1]) -: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[true]) -: : +- Exchange(distribution=[single]) -: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[false]) -: : +- Calc(select=[a AS random, RAND() AS EXPR$1]) -: : +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -: +- Exchange(distribution=[broadcast], reuse_id=[2]) -: +- Reused(reference_id=[1]) +: +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right]) +: :- Exchange(distribution=[hash[random]], shuffle_mode=[BATCH]) +: : +- Calc(select=[random], reuse_id=[1]) +: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[true]) +: : +- Exchange(distribution=[single]) +: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[false]) +: : +- Calc(select=[a AS random, RAND() AS EXPR$1]) +: : +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +: +- Exchange(distribution=[broadcast], reuse_id=[2]) +: +- Reused(reference_id=[1]) +- Reused(reference_id=[2]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml index 062a047..73f0151 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml @@ -717,7 +717,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c]) +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1]) +- Exchange(distribution=[hash[a]]) +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0]) - +- Calc(select=[a, _UTF-16LE'test' AS c, b]) + +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -741,7 +741,7 @@ LogicalProject(a=[$0], EXPR$1=[$2], c=[$1]) Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c]) +- HashAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1]) +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, _UTF-16LE'test' AS c, b]) + +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -766,7 +766,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c]) +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS EXPR$1]) +- Exchange(distribution=[hash[a]]) +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0]) - +- Calc(select=[a, _UTF-16LE'test' AS c, b]) + +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml index b9b901d..3f3e03f 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml @@ -737,7 +737,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c]) +- Exchange(distribution=[hash[a]]) +- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0]) +- Sort(orderBy=[a ASC]) - +- Calc(select=[a, _UTF-16LE'test' AS c, b]) + +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -762,7 +762,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c]) +- SortAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1]) +- Sort(orderBy=[a ASC]) +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, _UTF-16LE'test' AS c, b]) + +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -789,7 +789,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c]) +- Exchange(distribution=[hash[a]]) +- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0]) +- Sort(orderBy=[a ASC]) - +- Calc(select=[a, _UTF-16LE'test' AS c, b]) + +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml index cd2c9eb..cbd5d3e 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml @@ -1399,19 +1399,19 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 100))]) </Resource> <Resource name="planAfter"> <![CDATA[ -HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], isBroadcast=[true], build=[left]) -:- Exchange(distribution=[broadcast]) -: +- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], isBroadcast=[true], build=[right]) -: :- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], isBroadcast=[true], build=[right]) -: : :- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) -: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -: : +- Exchange(distribution=[broadcast]) -: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) -: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) -: +- Exchange(distribution=[broadcast]) -: +- Calc(select=[i], where=[<(j, 100)]) -: +- Reused(reference_id=[1]) -+- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f], isBroadcast=[true], build=[right]) +:- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, e, f], isBroadcast=[true], build=[right]) +: :- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], isBroadcast=[true], build=[left]) +: : :- Exchange(distribution=[broadcast]) +: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +: : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +: +- Exchange(distribution=[broadcast]) +: +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) +: +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) ++- Exchange(distribution=[broadcast]) + +- Calc(select=[i], where=[<(j, 100)]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -1854,28 +1854,26 @@ Calc(select=[b]) : : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : : : +- Exchange(distribution=[single]) : : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1]) - : : : : +- Calc(select=[i]) - : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : : : +- Calc(select=[i], reuse_id=[1]) + : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2]) : : : +- Exchange(distribution=[broadcast]) : : : +- Calc(select=[i, true AS i0]) : : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i]) : : : +- Exchange(distribution=[hash[i]]) : : : +- LocalHashAggregate(groupBy=[i], select=[i]) - : : : +- Calc(select=[i, true AS i0]) - : : : +- Reused(reference_id=[1]) + : : : +- Reused(reference_id=[1]) : : +- Exchange(distribution=[broadcast]) : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : +- Exchange(distribution=[single]) : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1]) - : : +- Calc(select=[CAST(j) AS EXPR$0]) - : : +- Reused(reference_id=[1]) + : : +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3]) + : : +- Reused(reference_id=[2]) : +- Exchange(distribution=[broadcast]) : +- Calc(select=[EXPR$0, true AS i]) : +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) - : +- Calc(select=[CAST(j) AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[3]) +- Exchange(distribution=[broadcast]) +- Calc(select=[d, f]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml index 8fe210d..b4fc401 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml @@ -174,17 +174,16 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3 <![CDATA[ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) +- HashJoin(joinType=[InnerJoin], where=[=(a2, a4)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) - :- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right]) + :- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) : :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) : +- Exchange(distribution=[broadcast]) - : +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) : +- Exchange(distribution=[broadcast]) - : +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)]) - : +- HashJoin(joinType=[InnerJoin], where=[=(a3, a1)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) - : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) - : +- Exchange(distribution=[broadcast]) - : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a3, a1)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +- Exchange(distribution=[broadcast]) +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) ]]> @@ -216,16 +215,15 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3 <![CDATA[ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) +- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right]) - :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right]) + :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[left]) : :- Exchange(distribution=[hash[b1]]) : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) : +- Exchange(distribution=[hash[b4]]) : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right]) - : :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)]) - : : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right]) - : : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) - : : +- Exchange(distribution=[broadcast]) - : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : :- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right]) + : : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : : +- Exchange(distribution=[broadcast]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) : +- Exchange(distribution=[broadcast]) : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +- Exchange(distribution=[broadcast]) @@ -270,11 +268,10 @@ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) : +- Exchange(distribution=[single]) : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +- Exchange(distribution=[broadcast]) - +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)]) - +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) + +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right]) + :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) ]]> </Resource> </TestCase> @@ -348,7 +345,7 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3 <Resource name="planAfter"> <![CDATA[ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) -+- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], build=[right]) ++- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right]) :- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right]) : :- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right]) : : :- Exchange(distribution=[hash[a1]]) @@ -356,11 +353,10 @@ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) : : +- Exchange(distribution=[hash[a2]]) : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) : +- Exchange(distribution=[broadcast]) - : +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)]) - : +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right]) - : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) - : +- Exchange(distribution=[broadcast]) - : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +- Exchange(distribution=[broadcast]) +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) ]]> @@ -408,11 +404,10 @@ Calc(select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5]) : :- Calc(select=[a5, b5, c5], where=[<(b5, 15)]) : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) : +- Exchange(distribution=[broadcast]) - : +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)]) - : +- HashJoin(joinType=[InnerJoin], where=[AND(=(a3, a1), <(*(b1, b3), 2000))], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) - : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) - : +- Exchange(distribution=[broadcast]) - : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + : +- HashJoin(joinType=[InnerJoin], where=[AND(=(a3, a1), <(*(b1, b3), 2000))], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +- Exchange(distribution=[broadcast]) +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) ]]> @@ -538,16 +533,15 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3 <![CDATA[ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) +- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right]) - :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right]) + :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[left]) : :- Exchange(distribution=[hash[b1]]) : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) : +- Exchange(distribution=[hash[b4]]) : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right]) - : :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)]) - : : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right]) - : : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) - : : +- Exchange(distribution=[broadcast]) - : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : :- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right]) + : : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : : +- Exchange(distribution=[broadcast]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) : +- Exchange(distribution=[broadcast]) : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +- Exchange(distribution=[broadcast]) @@ -583,17 +577,16 @@ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) +- HashJoin(joinType=[InnerJoin], where=[=(c1, c2)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) +- Exchange(distribution=[broadcast]) - +- HashJoin(joinType=[InnerJoin], where=[=(c2, c5)], select=[a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) - +- Exchange(distribution=[broadcast]) - +- HashJoin(joinType=[InnerJoin], where=[=(c3, c4)], select=[a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) - :- Calc(select=[a5, b5, c5, a3, b3, c3], where=[=(c5, c3)]) - : +- HashJoin(joinType=[InnerJoin], where=[=(c5, c3)], select=[a5, b5, c5, a3, b3, c3], isBroadcast=[true], build=[right]) - : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) - : +- Exchange(distribution=[broadcast]) - : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) + +- HashJoin(joinType=[InnerJoin], where=[=(c2, c5)], select=[a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + +- HashJoin(joinType=[InnerJoin], where=[=(c3, c4)], select=[a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- HashJoin(joinType=[InnerJoin], where=[=(c5, c3)], select=[a5, b5, c5, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml index fa78f5d..02364c1 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml @@ -508,27 +508,27 @@ LogicalFilter(condition=[<>($cor0.b, $1)]) <Resource name="planAfter"> <![CDATA[ NestedLoopJoin(joinType=[LeftAntiJoin], where=[<>(b, e)], select=[a, b, c], build=[right]) -:- NestedLoopJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS NULL(i)), =(c, k))], select=[a, b, c], build=[right]) -: :- NestedLoopJoin(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], build=[right], singleRowJoin=[true]) +:- NestedLoopJoin(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], build=[right], singleRowJoin=[true]) +: :- NestedLoopJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS NULL(i)), =(c, k))], select=[a, b, c], build=[right]) : : :- NestedLoopJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[right]) : : : :- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) : : : +- Exchange(distribution=[broadcast]) : : : +- Calc(select=[d]) : : : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f], reuse_id=[1]) : : +- Exchange(distribution=[broadcast]) -: : +- Calc(select=[IS NOT NULL(m) AS $f0]) -: : +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS m]) -: : +- Exchange(distribution=[single]) -: : +- LocalHashAggregate(select=[Partial_MIN(i) AS min$0]) -: : +- Calc(select=[true AS i]) -: : +- HashAggregate(isMerge=[true], groupBy=[l], select=[l]) -: : +- Exchange(distribution=[hash[l]]) -: : +- LocalHashAggregate(groupBy=[l], select=[l]) -: : +- Calc(select=[l], where=[LIKE(n, _UTF-16LE'Test')]) -: : +- TableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n]) +: : +- Calc(select=[i, k], where=[>(i, 10)]) +: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) : +- Exchange(distribution=[broadcast]) -: +- Calc(select=[i, k], where=[>(i, 10)]) -: +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) +: +- Calc(select=[IS NOT NULL(m) AS $f0]) +: +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS m]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_MIN(i) AS min$0]) +: +- Calc(select=[true AS i]) +: +- HashAggregate(isMerge=[true], groupBy=[l], select=[l]) +: +- Exchange(distribution=[hash[l]]) +: +- LocalHashAggregate(groupBy=[l], select=[l]) +: +- Calc(select=[l], where=[LIKE(n, _UTF-16LE'Test')]) +: +- TableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n]) +- Exchange(distribution=[broadcast]) +- Calc(select=[e]) +- Reused(reference_id=[1]) @@ -679,15 +679,14 @@ Calc(select=[b]) : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c]) : : +- Exchange(distribution=[single]) : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0]) - : : +- Calc(select=[1 AS EXPR$0]) - : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : +- Calc(select=[1 AS EXPR$0], reuse_id=[1]) + : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) : +- Exchange(distribution=[broadcast]) : +- Calc(select=[true AS i]) : +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) - : +- Calc(select=[1 AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[1]) +- Exchange(distribution=[broadcast]) +- Calc(select=[d, f]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) @@ -1204,15 +1203,14 @@ Calc(select=[b]) : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c]) : : +- Exchange(distribution=[single]) : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0]) - : : +- Calc(select=[1 AS EXPR$0]) - : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : +- Calc(select=[1 AS EXPR$0], reuse_id=[1]) + : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) : +- Exchange(distribution=[broadcast]) : +- Calc(select=[true AS i]) : +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) - : +- Calc(select=[1 AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[1]) +- Exchange(distribution=[broadcast]) +- Calc(select=[d]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) @@ -1872,19 +1870,19 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 100))]) </Resource> <Resource name="planAfter"> <![CDATA[ -NestedLoopJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left]) -:- Exchange(distribution=[broadcast]) -: +- NestedLoopJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], build=[right]) -: :- NestedLoopJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], build=[right]) -: : :- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) -: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -: : +- Exchange(distribution=[broadcast]) -: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) -: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) -: +- Exchange(distribution=[broadcast]) -: +- Calc(select=[i], where=[<(j, 100)]) -: +- Reused(reference_id=[1]) -+- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +NestedLoopJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f], build=[right]) +:- NestedLoopJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, e, f], build=[right]) +: :- NestedLoopJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left]) +: : :- Exchange(distribution=[broadcast]) +: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +: : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +: +- Exchange(distribution=[broadcast]) +: +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) +: +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) ++- Exchange(distribution=[broadcast]) + +- Calc(select=[i], where=[<(j, 100)]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -2250,28 +2248,26 @@ Calc(select=[b]) : : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : : : +- Exchange(distribution=[single]) : : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1]) - : : : : +- Calc(select=[i]) - : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : : : +- Calc(select=[i], reuse_id=[1]) + : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2]) : : : +- Exchange(distribution=[broadcast]) : : : +- Calc(select=[i, true AS i0]) : : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i]) : : : +- Exchange(distribution=[hash[i]]) : : : +- LocalHashAggregate(groupBy=[i], select=[i]) - : : : +- Calc(select=[i, true AS i0]) - : : : +- Reused(reference_id=[1]) + : : : +- Reused(reference_id=[1]) : : +- Exchange(distribution=[broadcast]) : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : +- Exchange(distribution=[single]) : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1]) - : : +- Calc(select=[CAST(j) AS EXPR$0]) - : : +- Reused(reference_id=[1]) + : : +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3]) + : : +- Reused(reference_id=[2]) : +- Exchange(distribution=[broadcast]) : +- Calc(select=[EXPR$0, true AS i]) : +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) - : +- Calc(select=[CAST(j) AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[3]) +- Exchange(distribution=[broadcast]) +- Calc(select=[d, f]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml index a9dfe9d..52e98cc 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml @@ -100,9 +100,10 @@ LogicalFilter(condition=[=($cor1.a, $0)]) </Resource> <Resource name="planAfter"> <![CDATA[ -HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], isBroadcast=[true], build=[right]) -:- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -+- Exchange(distribution=[broadcast]) +HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[right]) +:- Exchange(distribution=[hash[a]]) +: +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- Exchange(distribution=[hash[d]]) +- Calc(select=[d]) +- Correlate(invocation=[table_func($cor0.f)], correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, VARCHAR(2147483647) f0)], joinType=[INNER]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) @@ -516,29 +517,29 @@ LogicalFilter(condition=[<>($cor0.b, $1)]) <Resource name="planAfter"> <![CDATA[ NestedLoopJoin(joinType=[LeftAntiJoin], where=[<>(b, e)], select=[a, b, c], build=[right]) -:- HashJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS NULL(i)), =(c, k))], select=[a, b, c], build=[left]) -: :- Exchange(distribution=[hash[c]]) -: : +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], build=[right], singleRowJoin=[true]) -: : :- HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[right]) -: : : :- Exchange(distribution=[hash[a]]) -: : : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -: : : +- Exchange(distribution=[hash[d]]) -: : : +- Calc(select=[d]) -: : : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f], reuse_id=[1]) -: : +- Exchange(distribution=[broadcast]) -: : +- Calc(select=[IS NOT NULL(m) AS $f0]) -: : +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS m]) -: : +- Exchange(distribution=[single]) -: : +- LocalHashAggregate(select=[Partial_MIN(i) AS min$0]) -: : +- Calc(select=[true AS i]) -: : +- HashAggregate(isMerge=[true], groupBy=[l], select=[l]) -: : +- Exchange(distribution=[hash[l]]) -: : +- LocalHashAggregate(groupBy=[l], select=[l]) -: : +- Calc(select=[l], where=[LIKE(n, _UTF-16LE'Test')]) -: : +- TableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n]) -: +- Exchange(distribution=[hash[k]]) -: +- Calc(select=[i, k], where=[>(i, 10)]) -: +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) +:- NestedLoopJoin(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], build=[right], singleRowJoin=[true]) +: :- HashJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS NULL(i)), =(c, k))], select=[a, b, c], build=[right]) +: : :- Exchange(distribution=[hash[c]]) +: : : +- HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[right]) +: : : :- Exchange(distribution=[hash[a]]) +: : : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +: : : +- Exchange(distribution=[hash[d]]) +: : : +- Calc(select=[d]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f], reuse_id=[1]) +: : +- Exchange(distribution=[hash[k]]) +: : +- Calc(select=[i, k], where=[>(i, 10)]) +: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) +: +- Exchange(distribution=[broadcast]) +: +- Calc(select=[IS NOT NULL(m) AS $f0]) +: +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS m]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_MIN(i) AS min$0]) +: +- Calc(select=[true AS i]) +: +- HashAggregate(isMerge=[true], groupBy=[l], select=[l]) +: +- Exchange(distribution=[hash[l]]) +: +- LocalHashAggregate(groupBy=[l], select=[l]) +: +- Calc(select=[l], where=[LIKE(n, _UTF-16LE'Test')]) +: +- TableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n]) +- Exchange(distribution=[broadcast]) +- Calc(select=[e]) +- Reused(reference_id=[1]) @@ -694,15 +695,14 @@ Calc(select=[b]) : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c]) : : +- Exchange(distribution=[single]) : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0]) - : : +- Calc(select=[1 AS EXPR$0]) - : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : +- Calc(select=[1 AS EXPR$0], reuse_id=[1]) + : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) : +- Exchange(distribution=[broadcast]) : +- Calc(select=[true AS i]) : +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) - : +- Calc(select=[1 AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[1]) +- Exchange(distribution=[hash[d, f]]) +- Calc(select=[d, f]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) @@ -839,9 +839,10 @@ LogicalProject(f1=[$3]) </Resource> <Resource name="planAfter"> <![CDATA[ -HashJoin(joinType=[LeftSemiJoin], where=[AND(=(c, f1), =(a, d))], select=[a, b, c], isBroadcast=[true], build=[right]) -:- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -+- Exchange(distribution=[broadcast]) +HashJoin(joinType=[LeftSemiJoin], where=[AND(=(c, f1), =(a, d))], select=[a, b, c], build=[right]) +:- Exchange(distribution=[hash[c, a]]) +: +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- Exchange(distribution=[hash[f1, d]]) +- Calc(select=[f0 AS f1, d]) +- Correlate(invocation=[table_func($cor0.f)], correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, VARCHAR(2147483647) f0)], joinType=[INNER]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) @@ -1238,15 +1239,14 @@ Calc(select=[b]) : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c]) : : +- Exchange(distribution=[single]) : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0]) - : : +- Calc(select=[1 AS EXPR$0]) - : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : +- Calc(select=[1 AS EXPR$0], reuse_id=[1]) + : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) : +- Exchange(distribution=[broadcast]) : +- Calc(select=[true AS i]) : +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) - : +- Calc(select=[1 AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[1]) +- Exchange(distribution=[hash[d]]) +- Calc(select=[d]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) @@ -1427,9 +1427,10 @@ LogicalProject(f1=[$3]) </Resource> <Resource name="planAfter"> <![CDATA[ -HashJoin(joinType=[LeftSemiJoin], where=[=(c, f1)], select=[a, b, c], isBroadcast=[true], build=[right]) -:- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -+- Exchange(distribution=[broadcast]) +HashJoin(joinType=[LeftSemiJoin], where=[=(c, f1)], select=[a, b, c], build=[right]) +:- Exchange(distribution=[hash[c]]) +: +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- Exchange(distribution=[hash[f1]]) +- Calc(select=[f0 AS f1]) +- Correlate(invocation=[table_func($cor0.f)], correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, VARCHAR(2147483647) f0)], joinType=[INNER]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) @@ -1927,21 +1928,22 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 100))]) </Resource> <Resource name="planAfter"> <![CDATA[ -HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left]) -:- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], build=[right]) -: :- Exchange(distribution=[hash[a]]) -: : +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], build=[right]) -: : :- Exchange(distribution=[hash[b]]) -: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) -: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -: : +- Exchange(distribution=[hash[j]]) -: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) -: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) -: +- Exchange(distribution=[hash[i]]) -: +- Calc(select=[i], where=[<(j, 100)]) -: +- Reused(reference_id=[1]) -+- Exchange(distribution=[hash[d]]) - +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f], build=[right]) +:- Exchange(distribution=[hash[a]]) +: +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, e, f], build=[right]) +: :- Exchange(distribution=[hash[b]]) +: : +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left]) +: : :- Exchange(distribution=[hash[a]]) +: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +: : +- Exchange(distribution=[hash[d]]) +: : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +: +- Exchange(distribution=[hash[j]]) +: +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) +: +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) ++- Exchange(distribution=[hash[i]]) + +- Calc(select=[i], where=[<(j, 100)]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -2315,26 +2317,24 @@ Calc(select=[b]) : : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : : : +- Exchange(distribution=[single]) : : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1]) - : : : : +- Calc(select=[i]) - : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : : : +- Calc(select=[i], reuse_id=[1]) + : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2]) : : : +- Calc(select=[i, true AS i0]) : : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i]) : : : +- Exchange(distribution=[hash[i]]) : : : +- LocalHashAggregate(groupBy=[i], select=[i]) - : : : +- Calc(select=[i, true AS i0]) - : : : +- Reused(reference_id=[1]) + : : : +- Reused(reference_id=[1]) : : +- Exchange(distribution=[broadcast]) : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : +- Exchange(distribution=[single]) : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1]) - : : +- Calc(select=[CAST(j) AS EXPR$0]) - : : +- Reused(reference_id=[1]) + : : +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3]) + : : +- Reused(reference_id=[2]) : +- Calc(select=[EXPR$0, true AS i]) : +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) - : +- Calc(select=[CAST(j) AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[3]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[d, f]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml index 5100b1c..07ac0fb 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml @@ -82,6 +82,34 @@ HashJoin(joinType=[LeftSemiJoin], where=[AND(=(b, e), =(c, f))], select=[a, b, c ]]> </Resource> </TestCase> + <TestCase name="testExistsWithCorrelated_LateralTableInSubQuery"> + <Resource name="sql"> + <![CDATA[SELECT * FROM l WHERE EXISTS (SELECT * FROM r, LATERAL TABLE(table_func(f)) AS T(f1) WHERE a = d)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[EXISTS({ +LogicalFilter(condition=[=($cor1.a, $0)]) + LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}]) + LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]]) + LogicalTableFunctionScan(invocation=[table_func($cor0.f)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;]) +})], variablesSet=[[$cor1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[right]) +:- Exchange(distribution=[hash[a]]) +: +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- Exchange(distribution=[hash[d]]) + +- Calc(select=[d]) + +- Correlate(invocation=[table_func($cor0.f)], correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +]]> + </Resource> + </TestCase> <TestCase name="testExistsWithCorrelated_OverInSubQuery"> <Resource name="sql"> <![CDATA[SELECT * FROM l WHERE EXISTS (SELECT MAX(r.e) OVER() FROM r WHERE l.c = r.f GROUP BY r.e)]]> @@ -639,6 +667,63 @@ Calc(select=[c]) ]]> </Resource> </TestCase> + <TestCase name="testInWithUncorrelated_LateralTableInSubQuery"> + <Resource name="sql"> + <![CDATA[SELECT * FROM l WHERE c IN (SELECT f1 FROM r, LATERAL TABLE(table_func(f)) AS T(f1))]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[IN($2, { +LogicalProject(f1=[$3]) + LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}]) + LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]]) + LogicalTableFunctionScan(invocation=[table_func($cor0.f)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;]) +})]) + +- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashJoin(joinType=[LeftSemiJoin], where=[=(c, f1)], select=[a, b, c], build=[right]) +:- Exchange(distribution=[hash[c]]) +: +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- Exchange(distribution=[hash[f1]]) + +- Calc(select=[f0 AS f1]) + +- Correlate(invocation=[table_func($cor0.f)], correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +]]> + </Resource> + </TestCase> + <TestCase name="testInWithCorrelated_LateralTableInSubQuery"> + <Resource name="sql"> + <![CDATA[SELECT * FROM l WHERE c IN (SELECT f1 FROM r, LATERAL TABLE(table_func(f)) AS T(f1) WHERE a = d)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[IN($2, { +LogicalProject(f1=[$3]) + LogicalFilter(condition=[=($cor1.a, $0)]) + LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}]) + LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]]) + LogicalTableFunctionScan(invocation=[table_func($cor0.f)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;]) +})], variablesSet=[[$cor1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashJoin(joinType=[LeftSemiJoin], where=[AND(=(c, f1), =(a, d))], select=[a, b, c], build=[right]) +:- Exchange(distribution=[hash[c, a]]) +: +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- Exchange(distribution=[hash[f1, d]]) + +- Calc(select=[f0 AS f1, d]) + +- Correlate(invocation=[table_func($cor0.f)], correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +]]> + </Resource> + </TestCase> <TestCase name="testInWithCorrelated_MultiFields"> <Resource name="sql"> <![CDATA[SELECT * FROM l WHERE (a, SUBSTRING(c, 1, 5)) IN (SELECT d, SUBSTRING(f, 1, 5) FROM r WHERE l.b = r.e)]]> @@ -1435,21 +1520,22 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 100))]) </Resource> <Resource name="planAfter"> <![CDATA[ -HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left]) -:- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], build=[right]) -: :- Exchange(distribution=[hash[a]]) -: : +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], build=[right]) -: : :- Exchange(distribution=[hash[b]]) -: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) -: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -: : +- Exchange(distribution=[hash[j]]) -: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) -: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) -: +- Exchange(distribution=[hash[i]]) -: +- Calc(select=[i], where=[<(j, 100)]) -: +- Reused(reference_id=[1]) -+- Exchange(distribution=[hash[d]]) - +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f], build=[right]) +:- Exchange(distribution=[hash[a]]) +: +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, e, f], build=[right]) +: :- Exchange(distribution=[hash[b]]) +: : +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], build=[left]) +: : :- Exchange(distribution=[hash[a]]) +: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +: : +- Exchange(distribution=[hash[d]]) +: : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +: +- Exchange(distribution=[hash[j]]) +: +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) +: +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) ++- Exchange(distribution=[hash[i]]) + +- Calc(select=[i], where=[<(j, 100)]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -1909,26 +1995,24 @@ Calc(select=[b]) : : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : : : +- Exchange(distribution=[single]) : : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1]) - : : : : +- Calc(select=[i]) - : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : : : +- Calc(select=[i], reuse_id=[1]) + : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2]) : : : +- Calc(select=[i, true AS i0]) : : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i]) : : : +- Exchange(distribution=[hash[i]]) : : : +- LocalHashAggregate(groupBy=[i], select=[i]) - : : : +- Calc(select=[i, true AS i0]) - : : : +- Reused(reference_id=[1]) + : : : +- Reused(reference_id=[1]) : : +- Exchange(distribution=[broadcast]) : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : +- Exchange(distribution=[single]) : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1]) - : : +- Calc(select=[CAST(j) AS EXPR$0]) - : : +- Reused(reference_id=[1]) + : : +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3]) + : : +- Reused(reference_id=[2]) : +- Calc(select=[EXPR$0, true AS i]) : +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) - : +- Calc(select=[CAST(j) AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[3]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[d, f]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml index 54bb720..b9c22aa 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml @@ -1520,21 +1520,22 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 100))]) </Resource> <Resource name="planAfter"> <![CDATA[ -SortMergeJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f]) -:- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c]) -: :- Exchange(distribution=[hash[a]]) -: : +- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c]) -: : :- Exchange(distribution=[hash[b]]) -: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) -: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -: : +- Exchange(distribution=[hash[j]]) -: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) -: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) -: +- Exchange(distribution=[hash[i]]) -: +- Calc(select=[i], where=[<(j, 100)]) -: +- Reused(reference_id=[1]) -+- Exchange(distribution=[hash[d]]) - +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +SortMergeJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f]) +:- Exchange(distribution=[hash[a]]) +: +- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, e, f]) +: :- Exchange(distribution=[hash[b]]) +: : +- SortMergeJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f]) +: : :- Exchange(distribution=[hash[a]]) +: : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +: : +- Exchange(distribution=[hash[d]]) +: : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +: +- Exchange(distribution=[hash[j]]) +: +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) +: +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) ++- Exchange(distribution=[hash[i]]) + +- Calc(select=[i], where=[<(j, 100)]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -1994,26 +1995,24 @@ Calc(select=[b]) : : : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : : : +- Exchange(distribution=[single]) : : : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS count$1]) - : : : : +- Calc(select=[i]) - : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : : : +- Calc(select=[i], reuse_id=[1]) + : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2]) : : : +- Calc(select=[i, true AS i0]) : : : +- HashAggregate(isMerge=[true], groupBy=[i], select=[i]) : : : +- Exchange(distribution=[hash[i]]) : : : +- LocalHashAggregate(groupBy=[i], select=[i]) - : : : +- Calc(select=[i, true AS i0]) - : : : +- Reused(reference_id=[1]) + : : : +- Reused(reference_id=[1]) : : +- Exchange(distribution=[broadcast]) : : +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck]) : : +- Exchange(distribution=[single]) : : +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(EXPR$0) AS count$1]) - : : +- Calc(select=[CAST(j) AS EXPR$0]) - : : +- Reused(reference_id=[1]) + : : +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3]) + : : +- Reused(reference_id=[2]) : +- Calc(select=[EXPR$0, true AS i]) : +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) - : +- Calc(select=[CAST(j) AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[3]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[d, f]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml index ce66d1f..919e964 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml @@ -32,7 +32,7 @@ LogicalProject(a=[$0], d=[$3]) <Resource name="planAfter"> <![CDATA[ LogicalProject(a=[$0], d=[$3]) -+- LogicalJoin(condition=[AND(OR(AND(=($1, $4), =($0, 0)), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=($0, 0), =($0, 1), =($0, 2)))], joinType=[inner]) ++- LogicalJoin(condition=[AND(OR(AND(=($1, $4), =($0, 0)), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=(0, $0), =(1, $0), =(2, $0)))], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]]) ]]> @@ -54,7 +54,7 @@ LogicalProject(a=[$0], d=[$3]) <Resource name="planAfter"> <![CDATA[ LogicalProject(a=[$0], d=[$3]) -+- LogicalJoin(condition=[AND(=($1, $4), OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)))], joinType=[inner]) ++- LogicalJoin(condition=[AND(=($1, $4), OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=(1, $0), =(2, $0)), OR(=(2, $3), =(1, $3)))], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]]) ]]> @@ -98,7 +98,7 @@ LogicalProject(a=[$0], d=[$3]) <Resource name="planAfter"> <![CDATA[ LogicalProject(a=[$0], d=[$3]) -+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1))), OR(AND(=($0, 1), =($1, 1)), AND(=($0, 2), =($1, 2))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))))], joinType=[inner]) ++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1))), OR(AND(=(1, $0), =(1, $1)), AND(=(2, $0), =(2, $1))), OR(AND(=(2, $3), =(2, $4)), AND(=(1, $3), =(1, $4))))], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]]) ]]> @@ -120,7 +120,7 @@ LogicalProject(a=[$0], d=[$3]) <Resource name="planAfter"> <![CDATA[ LogicalProject(a=[$0], d=[$3]) -+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(AND(=($0, 3), =($3, 4)), AND(=($0, 4), =($3, 3))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)), OR(=($0, 3), =($0, 4)), OR(=($3, 4), =($3, 3)))], joinType=[inner]) ++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(AND(=($0, 3), =($3, 4)), AND(=($0, 4), =($3, 3))), OR(=(1, $0), =(2, $0)), OR(=(2, $3), =(1, $3)), OR(=(3, $0), =(4, $0)), OR(=(4, $3), =(3, $3)))], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]]) ]]> @@ -152,7 +152,7 @@ LogicalProject(a=[$0], d=[$6]) <Resource name="planAfter"> <![CDATA[ LogicalProject(a=[$0], d=[$6]) -+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1))), OR(AND(=($0, 1), =($1, 1)), =($0, 2)), OR(AND(=($3, 2), =($7, 2)), AND(=($4, 2), =($6, 1), =($7, 1))))], joinType=[inner]) ++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1))), OR(AND(=(1, $0), =(1, $1)), =(2, $0)), OR(AND(=(2, $3), =(2, $7)), AND(=(2, $4), =(1, $6), =(1, $7))))], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]]) +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7]) +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) @@ -177,7 +177,7 @@ LogicalProject(a=[$0], d=[$3]) <Resource name="planAfter"> <![CDATA[ LogicalProject(a=[$0], d=[$3]) -+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))))], joinType=[inner]) ++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))), OR(AND(=(2, $3), =(2, $4)), AND(=(1, $3), =(1, $4))))], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]]) ]]> @@ -199,7 +199,7 @@ LogicalProject(a=[$0], d=[$3]) <Resource name="planAfter"> <![CDATA[ LogicalProject(a=[$0], d=[$3]) -+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)))], joinType=[inner]) ++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=(1, $0), =(2, $0)), OR(=(2, $3), =(1, $3)))], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml index 19dfb77..1ec0d26 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml @@ -156,7 +156,7 @@ LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[100]) +- LogicalJoin(condition=[=($7, $1)], joinType=[semi]) :- LogicalTableScan(table=[[default_catalog, default_database, item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]]) +- LogicalProject(i_manufact=[$1]) - +- LogicalFilter(condition=[OR(AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'powder'), =($4, _UTF-16LE'khaki')), OR(=($5, _UTF-16LE'Ounce'), =($5, _UTF-16LE'Oz')), OR(=($6, _UTF-16LE'medium'), =($6, _UTF-16LE'extra large'))), AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'brown'), =($4, _UTF-16LE'honeydew')), OR(=($5, _UTF-16LE'Bunch'), =($5, _UTF-16LE'Ton')), OR(=($6, _UTF-16LE'N/A'), =($6, _UTF-16LE'small'))), AND(=($3, _UTF-16LE'Men'), OR(=($4, _UTF-16LE'floral'), =( [...] + +- LogicalFilter(condition=[OR(AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'powder'), =($4, _UTF-16LE'khaki')), OR(=($5, _UTF-16LE'Ounce'), =($5, _UTF-16LE'Oz')), OR(=($6, _UTF-16LE'medium'), =($6, _UTF-16LE'extra large'))), AND(=(_UTF-16LE'Women', $3), OR(=($4, _UTF-16LE'brown'), =($4, _UTF-16LE'honeydew')), OR(=($5, _UTF-16LE'Bunch'), =($5, _UTF-16LE'Ton')), OR(=($6, _UTF-16LE'N/A'), =($6, _UTF-16LE'small'))), AND(=($3, _UTF-16LE'Men'), OR(=($4, _UTF-16LE'floral'), =( [...] +- LogicalTableScan(table=[[default_catalog, default_database, item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.xml index 7736dea..856ce59 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.xml @@ -46,22 +46,22 @@ LogicalFilter(condition=[<>($cor0.b, $1)]) <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2]) +- LogicalJoin(condition=[<>($1, $3)], joinType=[anti]) - :- LogicalJoin(condition=[AND(OR(=($1, $3), IS NULL($1), IS NULL($3)), =($2, $4))], joinType=[anti]) - : :- LogicalJoin(condition=[$3], joinType=[semi]) + :- LogicalJoin(condition=[$3], joinType=[semi]) + : :- LogicalJoin(condition=[AND(OR(=($1, $3), IS NULL($1), IS NULL($3)), =($2, $4))], joinType=[anti]) : : :- LogicalJoin(condition=[=($0, $3)], joinType=[semi]) : : : :- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]]) : : : +- LogicalProject(d=[$0]) : : : +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]]) - : : +- LogicalProject($f0=[IS NOT NULL($0)]) - : : +- LogicalAggregate(group=[{}], m=[MIN($0)]) - : : +- LogicalProject(i=[true]) - : : +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT($0)]) - : : +- LogicalProject(l=[$0]) - : : +- LogicalFilter(condition=[LIKE($2, _UTF-16LE'Test')]) - : : +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(l, m, n)]]]) - : +- LogicalProject(i=[$0], k=[$2]) - : +- LogicalFilter(condition=[>($0, 10)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]]) + : : +- LogicalProject(i=[$0], k=[$2]) + : : +- LogicalFilter(condition=[>($0, 10)]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]]) + : +- LogicalProject($f0=[IS NOT NULL($0)]) + : +- LogicalAggregate(group=[{}], m=[MIN($0)]) + : +- LogicalProject(i=[true]) + : +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT($0)]) + : +- LogicalProject(l=[$0]) + : +- LogicalFilter(condition=[LIKE($2, _UTF-16LE'Test')]) + : +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(l, m, n)]]]) +- LogicalProject(e=[$1]) +- LogicalFilter(condition=[true]) +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml index 390832a..5c176e5 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml @@ -234,7 +234,7 @@ LogicalProject(a=[$0], EXPR$1=[$2], c=[$1]) Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c]) +- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1]) +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, _UTF-16LE'test' AS c, b]) + +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml index ddae158..55029e6 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml @@ -175,7 +175,7 @@ Calc(select=[4 AS four, EXPR$1]) +- GlobalGroupAggregate(groupBy=[b], select=[b, SUM(sum$0) AS EXPR$1]) +- Exchange(distribution=[hash[b]]) +- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0]) - +- Calc(select=[b, 4 AS four, a]) + +- Calc(select=[b, a]) +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml index 7ef21d9..c2dfa03 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml @@ -516,28 +516,28 @@ LogicalFilter(condition=[<>($cor0.b, $1)]) <![CDATA[ Join(joinType=[LeftAntiJoin], where=[<>(b, e)], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[single]) -: +- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS NULL(i)), =(c, k))], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -: :- Exchange(distribution=[hash[c]]) -: : +- Join(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -: : :- Exchange(distribution=[single]) +: +- Join(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: :- Exchange(distribution=[single]) +: : +- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS NULL(i)), =(c, k))], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: : :- Exchange(distribution=[hash[c]]) : : : +- Join(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) : : : :- Exchange(distribution=[hash[a]]) : : : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) : : : +- Exchange(distribution=[hash[d]]) : : : +- Calc(select=[d]) : : : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f], reuse_id=[1]) -: : +- Exchange(distribution=[single]) -: : +- Calc(select=[IS NOT NULL(m) AS $f0]) -: : +- GroupAggregate(select=[MIN(i) AS m]) -: : +- Exchange(distribution=[single]) -: : +- Calc(select=[true AS i]) -: : +- GroupAggregate(groupBy=[l], select=[l]) -: : +- Exchange(distribution=[hash[l]]) -: : +- Calc(select=[l], where=[LIKE(n, _UTF-16LE'Test')]) -: : +- TableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n]) -: +- Exchange(distribution=[hash[k]]) -: +- Calc(select=[i, k], where=[>(i, 10)]) -: +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) +: : +- Exchange(distribution=[hash[k]]) +: : +- Calc(select=[i, k], where=[>(i, 10)]) +: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) +: +- Exchange(distribution=[single]) +: +- Calc(select=[IS NOT NULL(m) AS $f0]) +: +- GroupAggregate(select=[MIN(i) AS m]) +: +- Exchange(distribution=[single]) +: +- Calc(select=[true AS i]) +: +- GroupAggregate(groupBy=[l], select=[l]) +: +- Exchange(distribution=[hash[l]]) +: +- Calc(select=[l], where=[LIKE(n, _UTF-16LE'Test')]) +: +- TableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n]) +- Exchange(distribution=[single]) +- Calc(select=[e]) +- Reused(reference_id=[1]) @@ -692,14 +692,13 @@ Calc(select=[b]) : : +- Exchange(distribution=[single]) : : +- GroupAggregate(select=[COUNT(*) AS c]) : : +- Exchange(distribution=[single]) - : : +- Calc(select=[1 AS EXPR$0]) - : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : +- Calc(select=[1 AS EXPR$0], reuse_id=[1]) + : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) : +- Exchange(distribution=[single]) : +- Calc(select=[true AS i]) : +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) - : +- Calc(select=[1 AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[1]) +- Exchange(distribution=[hash[d, f]]) +- Calc(select=[d, f]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) @@ -1235,14 +1234,13 @@ Calc(select=[b]) : : +- Exchange(distribution=[single]) : : +- GroupAggregate(select=[COUNT(*) AS c]) : : +- Exchange(distribution=[single]) - : : +- Calc(select=[1 AS EXPR$0]) - : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : +- Calc(select=[1 AS EXPR$0], reuse_id=[1]) + : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k]) : +- Exchange(distribution=[single]) : +- Calc(select=[true AS i]) : +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) - : +- Calc(select=[1 AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[1]) +- Exchange(distribution=[hash[d]]) +- Calc(select=[d]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) @@ -1923,22 +1921,22 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 100))]) </Resource> <Resource name="planAfter"> <![CDATA[ -Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +Join(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) -: +- Join(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -: :- Exchange(distribution=[hash[a]]) -: : +- Join(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -: : :- Exchange(distribution=[hash[b]]) +: +- Join(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: :- Exchange(distribution=[hash[b]]) +: : +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: : :- Exchange(distribution=[hash[a]]) : : : +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)]) : : : +- TableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -: : +- Exchange(distribution=[hash[j]]) -: : +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) -: : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) -: +- Exchange(distribution=[hash[i]]) -: +- Calc(select=[i], where=[<(j, 100)]) -: +- Reused(reference_id=[1]) -+- Exchange(distribution=[hash[d]]) - +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +: : +- Exchange(distribution=[hash[d]]) +: : +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +: +- Exchange(distribution=[hash[j]]) +: +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)]) +: +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) ++- Exchange(distribution=[hash[i]]) + +- Calc(select=[i], where=[<(j, 100)]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -2287,25 +2285,23 @@ Calc(select=[b]) : : : : +- Exchange(distribution=[single]) : : : : +- GroupAggregate(select=[COUNT(*) AS c, COUNT(i) AS ck]) : : : : +- Exchange(distribution=[single]) - : : : : +- Calc(select=[i]) - : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1]) + : : : : +- Calc(select=[i], reuse_id=[1]) + : : : : +- TableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2]) : : : +- Exchange(distribution=[hash[i]]) : : : +- Calc(select=[i, true AS i0]) : : : +- GroupAggregate(groupBy=[i], select=[i]) : : : +- Exchange(distribution=[hash[i]]) - : : : +- Calc(select=[i, true AS i0]) - : : : +- Reused(reference_id=[1]) + : : : +- Reused(reference_id=[1]) : : +- Exchange(distribution=[single]) : : +- GroupAggregate(select=[COUNT(*) AS c, COUNT(EXPR$0) AS ck]) : : +- Exchange(distribution=[single]) - : : +- Calc(select=[CAST(j) AS EXPR$0]) - : : +- Reused(reference_id=[1]) + : : +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3]) + : : +- Reused(reference_id=[2]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- Calc(select=[EXPR$0, true AS i]) : +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) - : +- Calc(select=[CAST(j) AS EXPR$0, true AS i]) - : +- Reused(reference_id=[1]) + : +- Reused(reference_id=[3]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[d, f]) +- TableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml index fd6047b..e4218f0 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml @@ -130,7 +130,7 @@ LogicalProject(four=[$1], EXPR$0=[$2]) Calc(select=[4 AS four, EXPR$0]) +- GroupAggregate(groupBy=[a], select=[a, SUM(b) AS EXPR$0]) +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, 4 AS four, b]) + +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -149,7 +149,7 @@ LogicalProject(four=[$1], EXPR$0=[$2]) Calc(select=[4 AS four, EXPR$0]) +- GroupAggregate(groupBy=[b], select=[b, SUM(a) AS EXPR$0]) +- Exchange(distribution=[hash[b]]) - +- Calc(select=[b, 4 AS four, a]) + +- Calc(select=[b, a]) +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml index 8b59588..9bcdc6b 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml @@ -91,7 +91,7 @@ Calc(select=[4 AS four, EXPR$0]) +- GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$0]) +- Exchange(distribution=[hash[a]]) +- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) AS sum$0]) - +- Calc(select=[a, 4 AS four, b]) + +- Calc(select=[a, b]) +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -112,7 +112,7 @@ Calc(select=[4 AS four, EXPR$0]) +- GlobalGroupAggregate(groupBy=[b], select=[b, SUM(sum$0) AS EXPR$0]) +- Exchange(distribution=[hash[b]]) +- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0]) - +- Calc(select=[b, 4 AS four, a]) + +- Calc(select=[b, a]) +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala index 8848881..d62a84d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala @@ -704,13 +704,13 @@ class DecimalTypeTest extends ExpressionTestBase { 'f42 % 'f41, "f42 % f41", "mod(f42, f41)", - "2.00") + "2.0000") testAllApis( 'f41 % 'f43, "f41 % f43", "mod(f41, f43)", - "3") + "3.00") testAllApis( 'f43 % 'f41, @@ -749,7 +749,7 @@ class DecimalTypeTest extends ExpressionTestBase { 'f46 % 'f47, "f46 % f47", "mod(f46, f47)", - "3.12") + "3.1234") } @Test // functions that treat Decimal as exact value diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala index 09d4916..f4814c4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala @@ -162,25 +162,4 @@ class ShuffledHashSemiAntiJoinTest extends SemiAntiJoinTestBase { super.testNotInWithUncorrelated_SimpleCondition3() } - @Test - override def testExistsWithCorrelated_LateralTableInSubQuery(): Unit = { - thrown.expect(classOf[TableException]) - thrown.expectMessage("Cannot generate a valid execution plan for the given query") - super.testExistsWithCorrelated_LateralTableInSubQuery() - } - - @Test - override def testInWithUncorrelated_LateralTableInSubQuery(): Unit = { - thrown.expect(classOf[TableException]) - thrown.expectMessage("Cannot generate a valid execution plan for the given query") - super.testInWithUncorrelated_LateralTableInSubQuery() - } - - @Test - override def testInWithCorrelated_LateralTableInSubQuery(): Unit = { - thrown.expect(classOf[TableException]) - thrown.expectMessage("Cannot generate a valid execution plan for the given query") - super.testInWithCorrelated_LateralTableInSubQuery() - } - } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala index 29ee98d..12e2ee8 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala @@ -43,7 +43,7 @@ class FlinkRexUtilTest { val i_size = rexBuilder.makeInputRef(varcharType, 4) // this predicate contains 95 RexCalls. however, - // if this predicate is converted to CNF, the result contains 736450 RexCalls. + // if this predicate is converted to CNF, the result contains 557715 RexCalls. val predicate = rexBuilder.makeCall(OR, rexBuilder.makeCall(AND, rexBuilder.makeCall(EQUALS, i_manufact, rexBuilder.makeLiteral("able")), @@ -181,10 +181,10 @@ class FlinkRexUtilTest { val newPredicate1 = FlinkRexUtil.toCnf(rexBuilder, -1, predicate) assertEquals(predicate.toString, newPredicate1.toString) - val newPredicate2 = FlinkRexUtil.toCnf(rexBuilder, 736449, predicate) + val newPredicate2 = FlinkRexUtil.toCnf(rexBuilder, 557714, predicate) assertEquals(predicate.toString, newPredicate2.toString) - val newPredicate3 = FlinkRexUtil.toCnf(rexBuilder, 736450, predicate) + val newPredicate3 = FlinkRexUtil.toCnf(rexBuilder, 557715, predicate) assertEquals(RexUtil.toCnf(rexBuilder, predicate).toString, newPredicate3.toString) val newPredicate4 = FlinkRexUtil.toCnf(rexBuilder, Int.MaxValue, predicate) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java index cbb3c5d..dfcced1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java @@ -28,10 +28,15 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.schema.impl.AbstractTable; +import java.util.List; +import java.util.stream.Collectors; + /** * A bridge between a Flink's specific {@link QueryOperationCatalogView} and a Calcite's * {@link org.apache.calcite.schema.Table}. It implements {@link TranslatableTable} interface. This enables @@ -49,7 +54,30 @@ public class QueryOperationCatalogViewTable extends AbstractTable implements Tra public static QueryOperationCatalogViewTable createCalciteTable(QueryOperationCatalogView catalogView) { return new QueryOperationCatalogViewTable(catalogView, typeFactory -> { TableSchema tableSchema = catalogView.getSchema(); - return ((FlinkTypeFactory) typeFactory).buildLogicalRowType(tableSchema); + final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; + final RelDataType relType = flinkTypeFactory.buildLogicalRowType(tableSchema); + Boolean[] nullables = tableSchema + .getTableColumns() + .stream() + .map(c -> c.getType().getLogicalType().isNullable()) + .toArray(Boolean[]::new); + final List<RelDataTypeField> fields = relType + .getFieldList() + .stream() + .map(f -> { + boolean nullable = nullables[f.getIndex()]; + if (nullable != f.getType().isNullable() + && !FlinkTypeFactory.isTimeIndicatorType(f.getType())) { + return new RelDataTypeFieldImpl( + f.getName(), + f.getIndex(), + flinkTypeFactory.createTypeWithNullability(f.getType(), nullable)); + } else { + return f; + } + }) + .collect(Collectors.toList()); + return flinkTypeFactory.createStructType(fields); }); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/PlannerQueryOperation.java b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/PlannerQueryOperation.java index c30d741..d5e581a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/PlannerQueryOperation.java +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/PlannerQueryOperation.java @@ -19,9 +19,10 @@ package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; @@ -43,11 +44,22 @@ public class PlannerQueryOperation implements QueryOperation { RelDataType rowType = calciteTree.getRowType(); String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); - TypeInformation[] fieldTypes = rowType.getFieldList() + DataType[] fieldTypes = rowType.getFieldList() .stream() - .map(field -> FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new); + .map(field -> { + final DataType fieldType = TypeConversions + .fromLegacyInfoToDataType(FlinkTypeFactory.toTypeInfo(field.getType())); + final boolean nullable = field.getType().isNullable(); + if (nullable != fieldType.getLogicalType().isNullable() + && !FlinkTypeFactory.isTimeIndicatorType(field.getType())) { + return nullable ? fieldType.nullable() : fieldType.notNull(); + } else { + return fieldType; + } + }) + .toArray(DataType[]::new); - this.tableSchema = new TableSchema(fieldNames, fieldTypes); + this.tableSchema = TableSchema.builder().fields(fieldNames, fieldTypes).build(); } public RelNode getCalciteTree() { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala index 77a5a83..74f71c1 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala @@ -376,11 +376,11 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataSetCalc", batchTableNode(table), - term("select", "CASE(=(a, 1), 1, 99) AS correct, rowtime") + term("select", "rowtime, CASE(=(a, 1), 1, 99) AS $f1") ), term("window", "TumblingGroupWindow('w$, 'rowtime, 900000.millis)"), - term("select", "SUM(correct) AS s, AVG(correct) AS a, start('w$) AS w$start," + - " end('w$) AS w$end, rowtime('w$) AS w$rowtime") + term("select", "SUM($f1) AS s, AVG($f1) AS a, start('w$) AS w$start," + + " end('w$) AS w$end, rowtime('w$) AS w$rowtime") ), term("select", "CAST(s) AS s", "CAST(a) AS a", "CAST(w$start) AS wStart") ) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala index 1b3daaa..056c882 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala @@ -64,16 +64,16 @@ class SetOperatorsTest extends TableTestBase { unaryNode( "DataSetCalc", batchTableNode(table1), - term("select", "b_long AS b_long3", "true AS $f0"), + term("select", "b_long", "true AS $f0"), term("where", "IS NOT NULL(b_long)") ), - term("groupBy", "b_long3"), - term("select", "b_long3", "MIN($f0) AS $f1") + term("groupBy", "b_long"), + term("select", "b_long", "MIN($f0) AS $f1") ), - term("select", "b_long3") + term("select", "b_long") ), - term("where", "=(a_long, b_long3)"), - term("join", "a_long", "a_int", "a_string", "b_long3"), + term("where", "=(a_long, b_long)"), + term("join", "a_long", "a_int", "a_string", "b_long"), term("joinType", "InnerJoin") ), term("select", "a_int", "a_string") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala index c501390..ede7541 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala @@ -351,7 +351,7 @@ class CalcTest extends TableTestBase { term("groupBy", "word"), term("select", "word", "SUM(frequency) AS EXPR$0") ), - term("select", "word, EXPR$0 AS frequency"), + term("select", "word, EXPR$0"), term("where", "=(EXPR$0, 2)") ) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala index c7c4aeb..733470d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala @@ -244,9 +244,9 @@ class GroupWindowTest extends TableTestBase { "rowtime('w$) AS w$rowtime", "proctime('w$) AS w$proctime") ), - term("select", "w$rowtime AS zzzzz") + term("select", "w$rowtime AS $f2") ), - term("window", "TumblingGroupWindow('w$, 'zzzzz, 4.millis)"), + term("window", "TumblingGroupWindow('w$, '$f2, 4.millis)"), term("select", "COUNT(*) AS a", "start('w$) AS w$start", @@ -329,12 +329,12 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(table), - term("select", "CASE(=(a, 1), 1, 99) AS correct", "rowtime") + term("select", "rowtime", "CASE(=(a, 1), 1, 99) AS $f1") ), term("window", "TumblingGroupWindow('w$, 'rowtime, 900000.millis)"), term("select", - "SUM(correct) AS s", - "AVG(correct) AS a", + "SUM($f1) AS s", + "AVG($f1) AS a", "start('w$) AS w$start", "end('w$) AS w$end", "rowtime('w$) AS w$rowtime", diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index 89ad599..dcc86ab 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -290,7 +290,7 @@ class JoinTest extends TableTestBase { ), unaryNode("DataStreamCalc", streamTableNode(t2), - term("select", "a", "c", "proctime", "CAST(12:BIGINT) AS nullField") + term("select", "a", "c", "proctime", "12:BIGINT AS nullField") ), term("where", "AND(=(a, a0), =(nullField, nullField0), >=(PROCTIME(proctime), " + "-(PROCTIME(proctime0), 5000:INTERVAL SECOND)), <=(PROCTIME(proctime), " + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala index 05874bc..cad41e9 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala @@ -139,15 +139,11 @@ class TemporalTableJoinTest extends TableTestBase { "DataStreamCalc", binaryNode( "DataStreamTemporalTableJoin", - unaryNode( - "DataStreamCalc", - streamTableNode(orders), - term("select", "o_rowtime, o_amount, o_currency, o_secondary_key") - ), + streamTableNode(orders), unaryNode( "DataStreamCalc", streamTableNode(ratesHistory), - term("select", "rowtime, currency, rate, secondary_key"), + term("select", "rowtime, comment, currency, rate, secondary_key"), term("where", ">(rate, 110:BIGINT)") ), term( @@ -158,10 +154,12 @@ class TemporalTableJoinTest extends TableTestBase { term( "join", "o_rowtime", + "o_comment", "o_amount", "o_currency", "o_secondary_key", "rowtime", + "comment", "currency", "rate", "secondary_key"), @@ -224,15 +222,12 @@ class TemporalTableJoinTest extends TableTestBase { binaryNode( "DataStreamTemporalTableJoin", streamTableNode(proctimeOrders), - unaryNode( - "DataStreamCalc", - streamTableNode(proctimeRatesHistory), - term("select", "currency, rate")), + streamTableNode(proctimeRatesHistory), term("where", "AND(" + s"${TEMPORAL_JOIN_CONDITION.getName}(o_proctime, currency), " + "=(currency, o_currency))"), - term("join", "o_amount", "o_currency", "o_proctime", "currency", "rate"), + term("join", "o_amount", "o_currency", "o_proctime", "currency", "rate", "proctime"), term("joinType", "InnerJoin") ), term("select", "*(o_amount, rate) AS rate") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala index 5e69382..0c0f096 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala @@ -111,15 +111,11 @@ class TemporalTableJoinTest extends TableTestBase { "DataStreamCalc", binaryNode( "DataStreamTemporalTableJoin", - unaryNode( - "DataStreamCalc", - streamTableNode(orders), - term("select", "o_rowtime, o_amount, o_currency, o_secondary_key") - ), + streamTableNode(orders), unaryNode( "DataStreamCalc", streamTableNode(ratesHistory), - term("select", "rowtime, currency, rate, secondary_key"), + term("select", "rowtime, comment, currency, rate, secondary_key"), term("where", ">(rate, 110:BIGINT)") ), term( @@ -130,10 +126,12 @@ class TemporalTableJoinTest extends TableTestBase { term( "join", "o_rowtime", + "o_comment", "o_amount", "o_currency", "o_secondary_key", "rowtime", + "comment", "currency", "rate", "secondary_key"), @@ -240,15 +238,12 @@ class TemporalTableJoinTest extends TableTestBase { binaryNode( "DataStreamTemporalTableJoin", streamTableNode(proctimeOrders), - unaryNode( - "DataStreamCalc", - streamTableNode(proctimeRatesHistory), - term("select", "currency, rate")), + streamTableNode(proctimeRatesHistory), term("where", "AND(" + s"${TEMPORAL_JOIN_CONDITION.getName}(o_proctime, currency), " + "=(currency, o_currency))"), - term("join", "o_amount", "o_currency", "o_proctime", "currency", "rate"), + term("join", "o_amount", "o_currency", "o_proctime", "currency", "rate", "proctime"), term("joinType", "InnerJoin") ), term("select", "*(o_amount, rate) AS rate") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala index 2e66dc8..1a83546 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala @@ -423,8 +423,8 @@ class RexProgramExtractorTest extends RexProgramTestBase { ) assertExpressionArrayEquals(expected, convertedExpressions) assertEquals(2, unconvertedRexNodes.length) - assertEquals(">(CAST($2):BIGINT NOT NULL, 100)", unconvertedRexNodes(0).toString) - assertEquals("OR(>(CAST($2):BIGINT NOT NULL, 100), <=($2, $1))", + assertEquals("<(100, CAST($2):BIGINT NOT NULL)", unconvertedRexNodes(0).toString) + assertEquals("OR(>=($1, $2), <(100, CAST($2):BIGINT NOT NULL))", unconvertedRexNodes(1).toString) } diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out b/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out index 1ef95b0..40fce87 100644 --- a/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out +++ b/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out @@ -1,6 +1,6 @@ == Abstract Syntax Tree == LogicalProject(first=[$0]) - EnumerableTableScan(table=[[default_catalog, default_database, MyTable]]) + LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[first], source=[CsvTableSource(read fields: first)])
