This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 57f8d58614f5c5355d1a4780fc7c08b80a488061 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Thu Dec 8 18:52:56 2022 +0100 [hotfix] replace deprecated calcite apis --- .../planner/delegation/hive/HiveParserCalcitePlanner.java | 11 +++++++++-- .../planner/delegation/hive/HiveParserRexNodeConverter.java | 3 +-- .../delegation/hive/copy/HiveParserSqlCountAggFunction.java | 7 ++++++- .../delegation/hive/copy/HiveParserSqlMinMaxAggFunction.java | 7 ++++++- .../delegation/hive/copy/HiveParserSqlSumAggFunction.java | 7 ++++++- .../table/planner/expressions/converter/OverConvertRule.java | 3 ++- .../flink/table/planner/functions/sql/SqlListAggFunction.java | 4 +++- .../functions/sql/internal/SqlAuxiliaryGroupAggFunction.java | 4 +++- .../logical/FlinkAggregateExpandDistinctAggregatesRule.java | 2 +- .../rules/logical/FlinkSemiAntiJoinFilterTransposeRule.java | 4 +++- .../flink/table/planner/plan/metadata/FlinkRelMdSize.scala | 6 ++++-- .../planner/plan/rules/logical/CalcRankTransposeRule.scala | 2 +- .../plan/rules/logical/DecomposeGroupingSetsRule.scala | 2 +- .../planner/plan/rules/logical/FlinkRewriteSubQueryRule.scala | 2 +- .../planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala | 4 ++-- .../plan/rules/logical/ProjectSemiAntiJoinTransposeRule.scala | 2 +- .../table/planner/plan/rules/logical/SplitAggregateRule.scala | 4 +--- .../flink/table/planner/plan/trait/FlinkRelDistribution.scala | 2 +- .../org/apache/flink/table/planner/plan/trait/TraitUtil.scala | 2 +- .../plan/metadata/AggCallSelectivityEstimatorTest.scala | 2 ++ .../flink/table/planner/plan/utils/FlinkRexUtilTest.scala | 2 +- 21 files changed, 56 insertions(+), 26 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java index 1a362b82f7a..63546109963 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java @@ -60,6 +60,8 @@ import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.plan.ViewExpanders; @@ -1218,7 +1220,8 @@ public class HiveParserCalcitePlanner { gbInputRexNodes.add(cluster.getRexBuilder().makeInputRef(srcRel, 0)); } - return LogicalAggregate.create(gbInputRel, groupSet, transformedGroupSets, aggregateCalls); + return LogicalAggregate.create( + gbInputRel, ImmutableList.of(), groupSet, transformedGroupSets, aggregateCalls); } // Generate GB plan. @@ -2409,7 +2412,11 @@ public class HiveParserCalcitePlanner { ImmutableBitSet.range(res.getRowType().getFieldList().size()); res = LogicalAggregate.create( - res, groupSet, Collections.emptyList(), Collections.emptyList()); + res, + ImmutableList.of(), + groupSet, + Collections.emptyList(), + Collections.emptyList()); HiveParserRowResolver groupByOutputRowResolver = new HiveParserRowResolver(); for (int i = 0; i < outRR.getColumnInfos().size(); i++) { ColumnInfo colInfo = outRR.getColumnInfos().get(i); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java index a12570268a2..5c5f67aa68e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java @@ -761,8 +761,7 @@ public class HiveParserRexNodeConverter { .makeNullLiteral( newChildRexNodeLst .get(newChildRexNodeLst.size() - 1) - .getType() - .getSqlTypeName())); + .getType())); } return newChildRexNodeLst; } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlCountAggFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlCountAggFunction.java index 40419c04e88..fdf25dcf8ff 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlCountAggFunction.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlCountAggFunction.java @@ -30,6 +30,7 @@ import org.apache.calcite.sql.type.SqlOperandTypeInference; import org.apache.calcite.sql.type.SqlReturnTypeInference; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Optionality; /** * Counterpart of hive's @@ -50,11 +51,15 @@ public class HiveParserSqlCountAggFunction extends SqlAggFunction SqlOperandTypeChecker operandTypeChecker) { super( "count", + null, SqlKind.COUNT, returnTypeInference, operandTypeInference, operandTypeChecker, - SqlFunctionCategory.NUMERIC); + SqlFunctionCategory.NUMERIC, + false, + false, + Optionality.FORBIDDEN); this.isDistinct = isDistinct; this.returnTypeInference = returnTypeInference; this.operandTypeChecker = operandTypeChecker; diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlMinMaxAggFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlMinMaxAggFunction.java index 996407be806..3c7de168fe9 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlMinMaxAggFunction.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlMinMaxAggFunction.java @@ -25,6 +25,7 @@ import org.apache.calcite.sql.SqlSplittableAggFunction; import org.apache.calcite.sql.type.SqlOperandTypeChecker; import org.apache.calcite.sql.type.SqlOperandTypeInference; import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.util.Optionality; /** * Counterpart of hive's @@ -39,11 +40,15 @@ public class HiveParserSqlMinMaxAggFunction extends SqlAggFunction { boolean isMin) { super( isMin ? "min" : "max", + null, isMin ? SqlKind.MIN : SqlKind.MAX, returnTypeInference, operandTypeInference, operandTypeChecker, - SqlFunctionCategory.NUMERIC); + SqlFunctionCategory.NUMERIC, + false, + false, + Optionality.FORBIDDEN); } @Override diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlSumAggFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlSumAggFunction.java index 11362dbaa34..6f860b0e46e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlSumAggFunction.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlSumAggFunction.java @@ -36,6 +36,7 @@ import org.apache.calcite.sql.type.SqlOperandTypeInference; import org.apache.calcite.sql.type.SqlReturnTypeInference; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Optionality; import java.util.ArrayList; import java.util.Collections; @@ -62,11 +63,15 @@ public class HiveParserSqlSumAggFunction extends SqlAggFunction SqlOperandTypeChecker operandTypeChecker) { super( "sum", + null, SqlKind.SUM, returnTypeInference, operandTypeInference, operandTypeChecker, - SqlFunctionCategory.NUMERIC); + SqlFunctionCategory.NUMERIC, + false, + false, + Optionality.FORBIDDEN); this.returnTypeInference = returnTypeInference; this.operandTypeChecker = operandTypeChecker; this.operandTypeInference = operandTypeInference; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java index c5fff324c9a..72a50297bcb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java @@ -133,7 +133,8 @@ public class OverConvertRule implements CallExpressionConvertRule { isPhysical, true, false, - isDistinct)); + isDistinct, + false)); } return Optional.empty(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java index 15511623585..c04fd4ca51e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java @@ -29,6 +29,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Optionality; import java.util.List; @@ -57,7 +58,8 @@ public class SqlListAggFunction extends SqlAggFunction { OperandTypes.and(OperandTypes.CHARACTER, OperandTypes.LITERAL))), SqlFunctionCategory.SYSTEM, false, - false); + false, + Optionality.FORBIDDEN); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/internal/SqlAuxiliaryGroupAggFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/internal/SqlAuxiliaryGroupAggFunction.java index 11dcc1072e6..b2a4ef47d70 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/internal/SqlAuxiliaryGroupAggFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/internal/SqlAuxiliaryGroupAggFunction.java @@ -25,6 +25,7 @@ import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.util.Optionality; /** * An internal [[SqlAggFunction]] to represents auxiliary group keys which will not be computed as @@ -43,6 +44,7 @@ public class SqlAuxiliaryGroupAggFunction extends SqlAggFunction { OperandTypes.ANY, SqlFunctionCategory.SYSTEM, false, - false); + false, + Optionality.FORBIDDEN); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java index 57f941ea410..2f885852598 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java @@ -470,7 +470,7 @@ public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule aggCall.left.filterArg, aggregate.getGroupCount(), fullGroupSet.cardinality()); - distinctAggCalls.add(newAggCall.rename(aggCall.right)); + distinctAggCalls.add(newAggCall.withName(aggCall.right)); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkSemiAntiJoinFilterTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkSemiAntiJoinFilterTransposeRule.java index c77097308c5..a3d6de0f7ec 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkSemiAntiJoinFilterTransposeRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkSemiAntiJoinFilterTransposeRule.java @@ -17,6 +17,8 @@ package org.apache.flink.table.planner.plan.rules.logical; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelNode; @@ -79,7 +81,7 @@ public class FlinkSemiAntiJoinFilterTransposeRule extends RelOptRule { join.getJoinType()); final RelFactories.FilterFactory factory = RelFactories.DEFAULT_FILTER_FACTORY; - RelNode newFilter = factory.createFilter(newJoin, filter.getCondition()); + RelNode newFilter = factory.createFilter(newJoin, filter.getCondition(), ImmutableSet.of()); call.transformTo(newFilter); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala index 0f312c35bfa..9a16bb3756d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala @@ -123,10 +123,12 @@ class FlinkRelMdSize private extends MetadataHandler[BuiltInMetadata.Size] { def averageColumnSizes(rel: Expand, mq: RelMetadataQuery): JList[JDouble] = { val fieldCount = rel.getRowType.getFieldCount + val fieldList = rel.getRowType.getFieldList // get each column's RexNode (RexLiteral, RexInputRef or null) val projectNodes = (0 until fieldCount).map { i => - val initNode: RexNode = rel.getCluster.getRexBuilder.constantNull() + val initNode: RexNode = + rel.getCluster.getRexBuilder.makeNullLiteral(fieldList.get(i).getType) rel.projects.foldLeft(initNode) { (mergeNode, project) => (mergeNode, project.get(i)) match { @@ -149,7 +151,7 @@ class FlinkRelMdSize private extends MetadataHandler[BuiltInMetadata.Size] { case (p, i) => val size = if (p == null || i == rel.expandIdIndex) { // use default value - FlinkRelMdSize.averageTypeValueSize(rel.getRowType.getFieldList.get(i).getType) + FlinkRelMdSize.averageTypeValueSize(fieldList.get(i).getType) } else { // use value from input averageRexSize(p, inputColumnSizes) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala index 454d80fd951..74b611e6330 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala @@ -166,7 +166,7 @@ class CalcRankTransposeRule val oldOrderKey = rank.orderKey val oldFieldCollations = oldOrderKey.getFieldCollations val newFieldCollations = oldFieldCollations.map { - fc => fc.copy(fieldMapping(fc.getFieldIndex)) + fc => fc.withFieldIndex(fieldMapping(fc.getFieldIndex)) } val newOrderKey = if (newFieldCollations.eq(oldFieldCollations)) { oldOrderKey diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala index 723888c436d..140c1d7c92c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.scala @@ -328,7 +328,7 @@ class DecomposeGroupingSetsRule val res: Long = call.getArgList.foldLeft(0L)( (res, arg) => (res << 1L) + (if (groups.contains(arg)) 0L else 1L)) builder.makeLiteral(res, call.getType, false) - case _ => builder.constantNull() + case _ => builder.makeNullLiteral(call.getType) } } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.scala index e198a1f44c0..3cee597cd1a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.scala @@ -56,7 +56,7 @@ class FlinkRewriteSubQueryRule( val filter: Filter = call.rel(0) val condition = filter.getCondition val newCondition = rewriteScalarQuery(condition) - if (RexUtil.eq(condition, newCondition)) { + if (condition.equals(newCondition)) { return } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala index 62607017313..518afbf7df7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala @@ -304,13 +304,13 @@ class FlinkSubQueryRemoveRule( replacement: RexNode): RexNode = { condition.accept(new RexShuttle() { override def visitSubQuery(subQuery: RexSubQuery): RexNode = { - if (RexUtil.eq(subQuery, oldSubQueryCall)) replacement else subQuery + if (subQuery.equals(oldSubQueryCall)) replacement else subQuery } override def visitCall(call: RexCall): RexNode = { call.getKind match { case SqlKind.NOT if call.operands.head.isInstanceOf[RexSubQuery] => - if (RexUtil.eq(call, oldSubQueryCall)) replacement else call + if (call.equals(oldSubQueryCall)) replacement else call case _ => super.visitCall(call) } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ProjectSemiAntiJoinTransposeRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ProjectSemiAntiJoinTransposeRule.scala index 96b86be053a..f912b020919 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ProjectSemiAntiJoinTransposeRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ProjectSemiAntiJoinTransposeRule.scala @@ -108,7 +108,7 @@ class ProjectSemiAntiJoinTransposeRule inputNeededFields: ImmutableBitSet, offset: Int): RelNode = { val rexBuilder = originInput.getCluster.getRexBuilder - val typeBuilder = new RelDataTypeFactory.FieldInfoBuilder(relBuilder.getTypeFactory) + val typeBuilder = relBuilder.getTypeFactory.builder() val newProjects: util.List[RexNode] = new util.ArrayList[RexNode]() val newFieldNames: util.List[String] = new util.ArrayList[String]() inputNeededFields.toList.foreach { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala index 35f8579b609..5ca5526059d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala @@ -384,9 +384,7 @@ class SplitAggregateRule FlinkSqlOperatorTable.EQUALS, countInputRef, relBuilder.getRexBuilder.makeBigintLiteral(JBigDecimal.valueOf(0))) - val ifTrue = relBuilder.cast( - relBuilder.getRexBuilder.constantNull(), - aggCall.`type`.getSqlTypeName) + val ifTrue = relBuilder.getRexBuilder.makeNullLiteral(aggCall.`type`) val ifFalse = relBuilder.call(FlinkSqlOperatorTable.DIVIDE, sumInputRef, countInputRef) relBuilder.call(FlinkSqlOperatorTable.IF, equals, ifTrue, ifFalse) } else { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/FlinkRelDistribution.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/FlinkRelDistribution.scala index e7fef969b13..1e1e58953c7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/FlinkRelDistribution.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/FlinkRelDistribution.scala @@ -119,7 +119,7 @@ class FlinkRelDistribution private ( try { val i = mapping.getTargetOpt(fieldCollation.getFieldIndex) if (i >= 0) { - newFieldCollations.add(fieldCollation.copy(i)) + newFieldCollations.add(fieldCollation.withFieldIndex(i)) } else { return FlinkRelDistribution.ANY } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/TraitUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/TraitUtil.scala index 247514c6e68..ba42dff0bef 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/TraitUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/TraitUtil.scala @@ -49,7 +49,7 @@ object TraitUtil { fieldCollation => try { val i = mapping.getTargetOpt(fieldCollation.getFieldIndex) - if (i >= 0) newFieldCollations.add(fieldCollation.copy(i)) + if (i >= 0) newFieldCollations.add(fieldCollation.withFieldIndex(i)) else return RelCollations.EMPTY } catch { case _: IndexOutOfBoundsException => return RelCollations.EMPTY diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala index 4d912ebc6df..ae27e1d7f2b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala @@ -30,6 +30,7 @@ import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.RelCollations import org.apache.calcite.rel.core.{Aggregate, AggregateCall, TableScan} +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical.LogicalAggregate import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase} import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode} @@ -118,6 +119,7 @@ class AggCallSelectivityEstimatorTest { LogicalAggregate.create( scan, + ImmutableList.of[RelHint], ImmutableBitSet.of(groupSet: _*), null, ImmutableList.copyOf(aggCalls.toArray)) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala index 5e7ba810fed..28744d87c2e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala @@ -206,7 +206,7 @@ class FlinkRexUtilTest { assertEquals(RexUtil.toCnf(rexBuilder, predicate).toString, newPredicate3.toString) val newPredicate4 = FlinkRexUtil.toCnf(rexBuilder, Int.MaxValue, predicate) - assertFalse(RexUtil.eq(predicate, newPredicate4)) + assertFalse(predicate.equals(newPredicate4)) assertEquals(RexUtil.toCnf(rexBuilder, predicate).toString, newPredicate4.toString) }
