This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7dad5c47c92db305dc39f58acb1f8cf88b6eaed1 Author: yuzhao.cyz <[email protected]> AuthorDate: Tue Mar 17 20:36:40 2020 +0800 [FLINK-14338][table-planner][table-planner-blink] Tweak implementations due to API change * Replace RelOptUtils.createCastRel with RelOptUtil.createCastRel * Implement RelOptTable#getKeys and Statistic#getKeys * Changes logical nodes constructor for hints * Implement RelShuttle.visit(LogicalTableModify) --- .../table/planner/catalog/SqlCatalogViewTable.java | 4 +- .../plan/schema/FlinkPreparingTableBase.java | 5 ++ .../table/planner/plan/utils/RelOptUtils.java | 85 ---------------------- .../planner/calcite/FlinkLogicalRelFactories.scala | 17 ++++- .../table/planner/calcite/FlinkPlannerImpl.scala | 4 + .../table/planner/calcite/FlinkRelOptCluster.scala | 77 -------------------- .../calcite/RelTimeIndicatorConverter.scala | 6 ++ .../planner/plan/nodes/common/CommonCalc.scala | 6 +- .../plan/nodes/logical/FlinkLogicalAggregate.scala | 8 +- .../logical/FlinkLogicalDataStreamTableScan.scala | 4 +- .../plan/nodes/logical/FlinkLogicalJoin.scala | 6 +- .../logical/FlinkLogicalTableSourceScan.scala | 4 +- .../table/planner/plan/stats/FlinkStatistic.scala | 6 +- .../table/planner/plan/utils/AggregateUtil.scala | 4 - .../table/planner/plan/utils/RelShuttles.scala | 2 + .../flink/table/planner/sinks/TableSinkUtils.scala | 7 +- .../planner/plan/common/ViewsExpandingTest.scala | 1 - .../metadata/AggCallSelectivityEstimatorTest.scala | 10 +-- .../plan/metadata/SelectivityEstimatorTest.scala | 7 +- .../optimize/program/FlinkChainedProgramTest.scala | 7 +- .../table/calcite/RelTimeIndicatorConverter.scala | 7 ++ .../flink/table/plan/stats/FlinkStatistic.scala | 2 + 22 files changed, 81 insertions(+), 198 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java index a9f56e1..cf2fb35 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java @@ -21,9 +21,9 @@ package org.apache.flink.table.planner.catalog; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable; import org.apache.flink.table.planner.plan.stats.FlinkStatistic; -import org.apache.flink.table.planner.plan.utils.RelOptUtils; import org.apache.calcite.plan.RelOptSchema; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; @@ -57,6 +57,6 @@ public class SqlCatalogViewTable extends ExpandingPreparingTable { RelNode original = context .expandView(rowType, view.getExpandedQuery(), viewPath, names) .project(); - return RelOptUtils.createCastRel(original, rowType); + return RelOptUtil.createCastRel(original, rowType, true); } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java index ef4ff37..98d1711 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java @@ -268,6 +268,11 @@ public abstract class FlinkPreparingTableBase extends Prepare.AbstractPreparingT } } + @Override + public List<ImmutableBitSet> getKeys() { + return statistic.getKeys(); + } + /** * Returns unique keySets of current table. */ diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelOptUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelOptUtils.java deleted file mode 100644 index d3dfb3e..0000000 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelOptUtils.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.utils; - -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.RelFactories; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexUtil; - -import java.util.ArrayList; -import java.util.List; - -/** - * <code>RelOptUtils</code> defines static utility methods for use in optimizing - * {@link RelNode}s. - * - * <p>This is an extension of {@link org.apache.calcite.plan.RelOptUtil}. - */ -public class RelOptUtils { - /** - * Creates a projection which casts a rel's output to a desired row type. - * - * <p>This method is inspired by {@link RelOptUtil#createCastRel}, different with that, - * we do not generate another {@link Project} if the {@code rel} is already a {@link Project}. - * - * @param rel Producer of rows to be converted - * @param castRowType Row type after cast - * @return Conversion rel with castRowType - */ - public static RelNode createCastRel(RelNode rel, RelDataType castRowType) { - RelFactories.ProjectFactory projectFactory = RelFactories.DEFAULT_PROJECT_FACTORY; - final RelDataType oriRowType = rel.getRowType(); - if (RelOptUtil.areRowTypesEqual(oriRowType, castRowType, true)) { - // nothing to do - return rel; - } - final RexBuilder rexBuilder = rel.getCluster().getRexBuilder(); - - final List<RelDataTypeField> fieldList = oriRowType.getFieldList(); - int n = fieldList.size(); - assert n == castRowType.getFieldCount() - : "field count: lhs [" + castRowType + "] rhs [" + oriRowType + "]"; - - final List<RexNode> rhsExps; - final RelNode input; - if (rel instanceof Project) { - rhsExps = ((Project) rel).getProjects(); - // Avoid to generate redundant project node. - input = rel.getInput(0); - } else { - rhsExps = new ArrayList<>(); - for (RelDataTypeField field : fieldList) { - rhsExps.add(rexBuilder.makeInputRef(field.getType(), field.getIndex())); - } - input = rel; - } - - final List<RexNode> castExps = - RexUtil.generateCastExpressions(rexBuilder, castRowType, rhsExps); - // Use names and types from castRowType. - return projectFactory.createProject(input, castExps, - castRowType.getFieldNames()); - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala index 99260d3..56ddbf0 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala @@ -25,10 +25,12 @@ import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType} import org.apache.flink.table.sinks.TableSink import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.RelOptTable.ToRelContext import org.apache.calcite.plan.{Contexts, RelOptCluster, RelOptTable} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.calcite.rel.core.RelFactories._ import org.apache.calcite.rel.core._ +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical._ import org.apache.calcite.rel.{RelCollation, RelNode} import org.apache.calcite.rex._ @@ -82,6 +84,7 @@ object FlinkLogicalRelFactories { class ProjectFactoryImpl extends ProjectFactory { def createProject( input: RelNode, + hints: util.List[RelHint], childExprs: util.List[_ <: RexNode], fieldNames: util.List[String]): RelNode = { val rexBuilder = input.getCluster.getRexBuilder @@ -134,6 +137,7 @@ object FlinkLogicalRelFactories { class AggregateFactoryImpl extends AggregateFactory { def createAggregate( input: RelNode, + hints: util.List[RelHint], groupSet: ImmutableBitSet, groupSets: ImmutableList[ImmutableBitSet], aggCalls: util.List[AggregateCall]): RelNode = { @@ -168,6 +172,7 @@ object FlinkLogicalRelFactories { def createJoin( left: RelNode, right: RelNode, + hints: util.List[RelHint], condition: RexNode, variablesSet: util.Set[CorrelationId], joinType: JoinRelType, @@ -208,14 +213,18 @@ object FlinkLogicalRelFactories { * [[FlinkLogicalTableSourceScan]] or [[FlinkLogicalDataStreamTableScan]]. */ class TableScanFactoryImpl extends TableScanFactory { - def createScan(cluster: RelOptCluster, table: RelOptTable): RelNode = { - val tableScan = LogicalTableScan.create(cluster, table) + def createScan(toRelContext: ToRelContext, table: RelOptTable): RelNode = { + val cluster = toRelContext.getCluster + val hints = toRelContext.getTableHints + val tableScan = LogicalTableScan.create(cluster, table, hints) tableScan match { case s: LogicalTableScan if FlinkLogicalTableSourceScan.isTableSourceScan(s) => - FlinkLogicalTableSourceScan.create(cluster, + FlinkLogicalTableSourceScan.create( + cluster, s.getTable.asInstanceOf[FlinkPreparingTableBase]) case s: LogicalTableScan if FlinkLogicalDataStreamTableScan.isDataStreamTableScan(s) => - FlinkLogicalDataStreamTableScan.create(cluster, + FlinkLogicalDataStreamTableScan.create( + cluster, s.getTable.asInstanceOf[FlinkPreparingTableBase]) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index 6426436..e28e1e7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -22,10 +22,12 @@ import org.apache.flink.sql.parser.ExtendedSqlNode import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader +import com.google.common.collect.ImmutableList import org.apache.calcite.config.NullCollation import org.apache.calcite.plan._ import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.{RelFieldCollation, RelRoot} import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator} import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable} @@ -190,6 +192,8 @@ class FlinkPlannerImpl( } override def getCluster: RelOptCluster = cluster + + override def getTableHints: util.List[RelHint] = ImmutableList.of() } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptCluster.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptCluster.scala deleted file mode 100644 index d9d8b33..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptCluster.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.calcite - -import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery - -import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner, RelOptRule, RelOptRuleCall} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataTypeFactory -import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rex.RexBuilder - -import java.util -import java.util.concurrent.atomic.AtomicInteger - -/** - * Flink specific [[RelOptCluster]] to use [[FlinkRelMetadataQuery]] - * instead of [[RelMetadataQuery]]. - */ -class FlinkRelOptCluster( - planner: RelOptPlanner, - typeFactory: RelDataTypeFactory, - rexBuilder: RexBuilder, - nextCorrel: AtomicInteger, - mapCorrelToRel: util.Map[String, RelNode]) - extends RelOptCluster(planner, typeFactory, rexBuilder, nextCorrel, mapCorrelToRel) { - - private var fmq: FlinkRelMetadataQuery = _ - - /** - * Returns the current [[FlinkRelMetadataQuery]] instead of [[RelMetadataQuery]]. - * - * <p>This method might be changed or moved in future. - * If you have a [[RelOptRuleCall]] available, - * for example if you are in a [[RelOptRule#onMatch(RelOptRuleCall)]] - * method, then use [[RelOptRuleCall#getMetadataQuery()]] instead. - */ - override def getMetadataQuery: RelMetadataQuery = { - if (fmq == null) { - fmq = FlinkRelMetadataQuery.instance() - } - fmq - } - - /** - * Should be called whenever the current [[FlinkRelMetadataQuery]] becomes - * invalid. Typically invoked from [[RelOptRuleCall#transformTo]]. - */ - override def invalidateMetadataQuery(): Unit = fmq = null -} - -object FlinkRelOptCluster { - /** Creates a FlinkRelOptCluster instance. */ - def create(planner: RelOptPlanner, rexBuilder: RexBuilder): RelOptCluster = - new FlinkRelOptCluster( - planner, - rexBuilder.getTypeFactory, - rexBuilder, - new AtomicInteger(0), - new util.HashMap[String, RelNode]) -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala index fa6eb10..b6bec80 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala @@ -35,6 +35,8 @@ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.sql.fun.SqlStdOperatorTable.FINAL +import java.util.{Collections => JCollections} + import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable @@ -464,6 +466,10 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { updatedAggCalls) } + override def visit(modify: LogicalTableModify): RelNode = { + val input = modify.getInput.accept(this) + modify.copy(modify.getTraitSet, JCollections.singletonList(input)) + } } object RelTimeIndicatorConverter { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala index 532e9df..5da5d9b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala @@ -21,12 +21,16 @@ package org.apache.flink.table.planner.plan.nodes.common import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat import org.apache.flink.table.planner.plan.nodes.{ExpressionFormat, FlinkRelNode} import org.apache.flink.table.planner.plan.utils.RelExplainUtil.{conditionToString, preferExpressionFormat} + import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexProgram} +import java.util.Collections + import scala.collection.JavaConversions._ /** @@ -37,7 +41,7 @@ abstract class CommonCalc( traitSet: RelTraitSet, input: RelNode, calcProgram: RexProgram) - extends Calc(cluster, traitSet, input, calcProgram) + extends Calc(cluster, traitSet, Collections.emptyList[RelHint](), input, calcProgram) with FlinkRelNode { override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala index 476fe4a..2307c72 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala @@ -26,12 +26,14 @@ import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.{Aggregate, AggregateCall} +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical.LogicalAggregate import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.sql.SqlKind import org.apache.calcite.util.ImmutableBitSet import java.util +import java.util.Collections import scala.collection.JavaConversions._ @@ -48,7 +50,8 @@ class FlinkLogicalAggregate( aggCalls: util.List[AggregateCall], /* flag indicating whether to skip SplitAggregateRule */ var partialFinalType: PartialFinalType = PartialFinalType.NONE) - extends Aggregate(cluster, traitSet, child, groupSet, groupSets, aggCalls) + extends Aggregate(cluster, traitSet, Collections.emptyList[RelHint](), + child, groupSet, groupSets, aggCalls) with FlinkLogicalRel { def setPartialFinalType(partialFinalType: PartialFinalType): Unit = { @@ -78,7 +81,6 @@ class FlinkLogicalAggregate( planner.getCostFactory.makeCost(rowCnt, cpuCost, rowCnt * rowSize) } } - } private class FlinkLogicalAggregateBatchConverter @@ -157,6 +159,6 @@ object FlinkLogicalAggregate { aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = { val cluster = input.getCluster val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify() - new FlinkLogicalAggregate(cluster,traitSet, input, groupSet, groupSets, aggCalls) + new FlinkLogicalAggregate(cluster, traitSet, input, groupSet, groupSets, aggCalls) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala index b104a66..d630e85 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala @@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList import org.apache.calcite.plan._ import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode} import java.util +import java.util.Collections import java.util.function.Supplier /** @@ -40,7 +42,7 @@ class FlinkLogicalDataStreamTableScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable) - extends TableScan(cluster, traitSet, table) + extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), table) with FlinkLogicalRel { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala index cfe6f71..cbc8396 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala @@ -24,10 +24,13 @@ import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.{CorrelationId, Join, JoinRelType} +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical.LogicalJoin import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rex.RexNode +import java.util.Collections + import scala.collection.JavaConversions._ /** @@ -41,7 +44,8 @@ class FlinkLogicalJoin( right: RelNode, condition: RexNode, joinType: JoinRelType) - extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId], joinType) + extends Join(cluster, traitSet, Collections.emptyList[RelHint](), + left, right, condition, Set.empty[CorrelationId], joinType) with FlinkLogicalRel { override def copy( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala index f28328d..396318a7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -28,11 +28,13 @@ import org.apache.calcite.plan._ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter} import java.util +import java.util.Collections import java.util.function.Supplier /** @@ -43,7 +45,7 @@ class FlinkLogicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, relOptTable: TableSourceTable[_]) - extends TableScan(cluster, traitSet, relOptTable) + extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), relOptTable) with FlinkLogicalRel { lazy val tableSource: TableSource[_] = tableSourceTable.tableSource diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala index f2d6c51..37ff255 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala @@ -21,11 +21,11 @@ package org.apache.flink.table.planner.plan.stats import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity +import com.google.common.collect.ImmutableList import org.apache.calcite.rel.{RelCollation, RelDistribution, RelReferentialConstraint} import org.apache.calcite.schema.Statistic import org.apache.calcite.util.ImmutableBitSet -import java.lang.Double import java.util import scala.collection.JavaConversions._ @@ -80,7 +80,7 @@ class FlinkStatistic( * * @return The number of rows of the table. */ - override def getRowCount: Double = { + override def getRowCount: java.lang.Double = { if (tableStats != TableStats.UNKNOWN) { val rowCount = tableStats.getRowCount.toDouble // rowCount requires non-negative number @@ -136,6 +136,8 @@ class FlinkStatistic( } builder.toString() } + + override def getKeys: util.List[ImmutableBitSet] = ImmutableList.of() } /** diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala index 674fb2b..760306e8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala @@ -128,10 +128,6 @@ object AggregateUtil extends Enumeration { require(auxGroupCalls.isEmpty, "AUXILIARY_GROUP aggCalls should be empty when groupSet is empty") } - if (agg.indicator) { - require(auxGroupCalls.isEmpty, - "AUXILIARY_GROUP aggCalls should be empty when indicator is true") - } val auxGrouping = auxGroupCalls.map(_.getArgList.head.toInt).toArray require(auxGrouping.length + otherAggCalls.length == aggCalls.length) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala index 4f74d24..077c8c1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala @@ -72,6 +72,8 @@ class DefaultRelShuttle extends RelShuttle { override def visit(join: LogicalJoin): RelNode = visit(join.asInstanceOf[RelNode]) override def visit(correlate: LogicalCorrelate): RelNode = visit(correlate.asInstanceOf[RelNode]) + + override def visit(modify: LogicalTableModify): RelNode = visit(modify.asInstanceOf[RelNode]) } /** diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala index 84397c9..d4a6d3b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.planner.sinks -import org.apache.calcite.rel.RelNode import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo @@ -27,7 +26,6 @@ import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier} import org.apache.flink.table.dataformat.BaseRow import org.apache.flink.table.operations.CatalogSinkModifyOperation import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.plan.utils.RelOptUtils import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo import org.apache.flink.table.sinks._ @@ -41,6 +39,9 @@ import org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataT import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils} import org.apache.flink.types.Row +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.RelNode + import _root_.scala.collection.JavaConversions._ object TableSinkUtils { @@ -78,7 +79,7 @@ object TableSinkUtils { val castedDataType = typeFactory.buildRelNodeRowType( sinkLogicalType.getFieldNames, sinkLogicalType.getFields.map(_.getType)) - RelOptUtils.createCastRel(query, castedDataType) + RelOptUtil.createCastRel(query, castedDataType, true) } } else { // format query and sink schema strings diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala index efed57a..074a072 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala @@ -59,7 +59,6 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase => TableTestUtil) extends val tableUtil = tableTestUtil(this) val tableEnv = tableUtil.tableEnv tableUtil.addDataStream[(Int, String, Int)]("t1", 'a, 'b, 'c) - val catalog = tableEnv.getCatalog(tableEnv.getCurrentCatalog).get() tableEnv.createTemporaryView("view1", tableEnv.from("t1")) tableEnv.createTemporaryView("view2", tableEnv.from("view1")) tableEnv.createTemporaryView("view3", tableEnv.from("view2")) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala index 0fa3485..e4ddf88 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala @@ -19,13 +19,16 @@ package org.apache.flink.table.planner.plan.metadata import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} +import org.apache.flink.table.catalog.FunctionCatalog +import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} import org.apache.flink.table.planner.calcite.{FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.planner.plan.schema._ import org.apache.flink.table.planner.plan.stats.FlinkStatistic import org.apache.flink.table.planner.{JDouble, JLong} +import org.apache.flink.table.utils.CatalogManagerMocks import org.apache.flink.util.Preconditions + import com.google.common.collect.ImmutableList import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster} import org.apache.calcite.rel.`type`.RelDataType @@ -45,10 +48,8 @@ import org.junit.{Before, BeforeClass, Test} import org.powermock.api.mockito.PowerMockito._ import org.powermock.core.classloader.annotations.PrepareForTest import org.powermock.modules.junit4.PowerMockRunner -import java.math.BigDecimal -import org.apache.flink.table.module.ModuleManager -import org.apache.flink.table.utils.CatalogManagerMocks +import java.math.BigDecimal import scala.collection.JavaConversions._ @@ -634,4 +635,3 @@ object AggCallSelectivityEstimatorTest { } } - diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala index 911d732..031a668 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala @@ -26,7 +26,9 @@ import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl, F import org.apache.flink.table.planner.plan.schema._ import org.apache.flink.table.planner.plan.stats.FlinkStatistic import org.apache.flink.table.planner.{JDouble, JLong} +import org.apache.flink.table.utils.CatalogManagerMocks import org.apache.flink.util.Preconditions + import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan @@ -43,11 +45,10 @@ import org.junit.{Before, BeforeClass, Test} import org.powermock.api.mockito.PowerMockito._ import org.powermock.core.classloader.annotations.PrepareForTest import org.powermock.modules.junit4.PowerMockRunner + import java.math.BigDecimal import java.sql.{Date, Time, Timestamp} -import org.apache.flink.table.utils.CatalogManagerMocks - import scala.collection.JavaConverters._ /** @@ -1111,6 +1112,4 @@ object SelectivityEstimatorTest { .THREAD_PROVIDERS .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE)) } - } - diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala index 8b6039b..10cec79 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala @@ -26,6 +26,8 @@ import org.apache.calcite.tools.RuleSets import org.junit.Assert._ import org.junit.Test +import java.util.Collections + import scala.collection.JavaConversions._ /** @@ -49,10 +51,7 @@ class FlinkChainedProgramTest { .addRuleInstance(SubQueryRemoveRule.JOIN) .addMatchLimit(100) .addMatchOrder(HepMatchOrder.BOTTOM_UP) - .addRuleCollection(Array( - TableScanRule.INSTANCE, - ValuesReduceRule.FILTER_INSTANCE - ).toList) + .addRuleCollection(Collections.singletonList(ValuesReduceRule.FILTER_INSTANCE)) val program1 = FlinkHepProgram(builder.build()) assertTrue(programs.addFirst("o2", program1)) assertEquals(List("o2"), programs.getProgramNames.toList) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala index d3aaf7d..7f85245 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala @@ -34,6 +34,8 @@ import org.apache.flink.table.functions.sql.ProctimeSqlFunction import org.apache.flink.table.plan.logical.rel._ import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import java.util.{Collections => JCollections} + import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable @@ -377,6 +379,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { rexBuilder, inputs.flatMap(_.getRowType.getFieldList.map(_.getType))) } + + override def visit(modify: LogicalTableModify): RelNode = { + val input = modify.getInput.accept(this) + modify.copy(modify.getTraitSet, JCollections.singletonList(input)) + } } object RelTimeIndicatorConverter { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala index 754509e..957bdf6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala @@ -70,6 +70,8 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic { override def getReferentialConstraints: util.List[RelReferentialConstraint] = Collections.emptyList() + + override def getKeys: util.List[ImmutableBitSet] = Collections.emptyList() } /**
