This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f32ba645246a8180a02182650fb51392facfcc09 Author: lincoln lee <[email protected]> AuthorDate: Fri Sep 9 16:44:03 2022 +0800 [FLINK-28850][table-planner] cherry-pick CALCITE-5251 to support hint for Snapshot node This closes #20800 --- .../java/org/apache/calcite/rel/core/Snapshot.java | 42 ++++++++++++++++++---- .../apache/calcite/rel/hint/HintPredicates.java | 7 ++++ .../calcite/rel/hint/NodeTypeHintPredicate.java | 6 +++- .../calcite/rel/logical/LogicalSnapshot.java | 34 ++++++++++++++++-- 4 files changed, 79 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java index b47aa300d37..531f69345dc 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java @@ -16,17 +16,19 @@ */ package org.apache.calcite.rel.core; +import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.hint.Hintable; +import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Litmus; -import org.checkerframework.checker.nullness.qual.Nullable; import java.util.List; import java.util.Objects; @@ -40,12 +42,17 @@ import java.util.Objects; * Snapshot}(TableScan(Products)) is a relational operator that only returns the contents whose * versions that overlap with the given specific period (i.e. those that started before given period * and ended after it). + * + * <p>Temporarily copy from calcite to cherry-pick [CALCITE-5251] and will be removed when upgrade + * to caclite-1.32.0. */ -public abstract class Snapshot extends SingleRel { +public abstract class Snapshot extends SingleRel implements Hintable { // ~ Instance fields -------------------------------------------------------- private final RexNode period; + protected final ImmutableList<RelHint> hints; + // ~ Constructors ----------------------------------------------------------- /** @@ -53,16 +60,35 @@ public abstract class Snapshot extends SingleRel { * * @param cluster Cluster that this relational expression belongs to * @param traitSet The traits of this relational expression + * @param hints Hints for this node * @param input Input relational expression * @param period Timestamp expression which as the table was at the given time in the past */ - @SuppressWarnings("method.invocation.invalid") - protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) { + protected Snapshot( + RelOptCluster cluster, + RelTraitSet traitSet, + List<RelHint> hints, + RelNode input, + RexNode period) { super(cluster, traitSet, input); this.period = Objects.requireNonNull(period, "period"); + this.hints = ImmutableList.copyOf(hints); assert isValid(Litmus.THROW, null); } + /** + * Creates a Snapshot. + * + * @param cluster Cluster that this relational expression belongs to + * @param traitSet The traits of this relational expression + * @param input Input relational expression + * @param period Timestamp expression which as the table was at the given time in the past + */ + @SuppressWarnings("method.invocation.invalid") + protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) { + this(cluster, traitSet, ImmutableList.of(), input, period); + } + // ~ Methods ---------------------------------------------------------------- @Override @@ -72,7 +98,6 @@ public abstract class Snapshot extends SingleRel { public abstract Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period); - @Override public RelNode accept(RexShuttle shuttle) { RexNode condition = shuttle.apply(this.period); if (this.period == condition) { @@ -91,7 +116,7 @@ public abstract class Snapshot extends SingleRel { } @Override - public boolean isValid(Litmus litmus, @Nullable Context context) { + public boolean isValid(Litmus litmus, RelNode.Context context) { RelDataType dataType = period.getType(); if (dataType.getSqlTypeName() != SqlTypeName.TIMESTAMP) { return litmus.fail( @@ -101,4 +126,9 @@ public abstract class Snapshot extends SingleRel { } return litmus.succeed(); } + + @Override + public ImmutableList<RelHint> getHints() { + return hints; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java index a38ad2e141e..7081c68041a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java @@ -109,6 +109,13 @@ public abstract class HintPredicates { public static final HintPredicate WINDOW = new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.WINDOW); + /** + * A hint predicate that indicates a hint can only be used to {@link + * org.apache.calcite.rel.core.Snapshot} nodes. + */ + public static final HintPredicate SNAPSHOT = + new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.SNAPSHOT); + /** * Returns a composed hint predicate that represents a short-circuiting logical AND of an array * of hint predicates {@code hintPredicates}. When evaluating the composed predicate, if a diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java index 4ed6d117c09..9fce804833e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java @@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.Snapshot; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.Values; @@ -78,7 +79,10 @@ public class NodeTypeHintPredicate implements HintPredicate { VALUES(Values.class), /** The hint would be propagated to the Window nodes. */ - WINDOW(Window.class); + WINDOW(Window.class), + + /** The hint would be propagated to the Snapshot nodes. */ + SNAPSHOT(Snapshot.class); /** Relational expression clazz that the hint can apply to. */ private Class<?> relClazz; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java index d32e0924dd4..f2e974d3dd3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java @@ -18,6 +18,7 @@ package org.apache.calcite.rel.logical; +import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -25,6 +26,7 @@ import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelDistributionTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Snapshot; +import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.metadata.RelMdCollation; import org.apache.calcite.rel.metadata.RelMdDistribution; import org.apache.calcite.rel.metadata.RelMetadataQuery; @@ -32,16 +34,37 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Litmus; +import java.util.List; + /** * Sub-class of {@link org.apache.calcite.rel.core.Snapshot} not targeted at any particular engine * or calling convention. The class was copied over because of * CALCITE-4554. * * - * <p>Line 80 ~ 91: Calcite only supports timestamp type as period type, but Flink supports both + * <p>Line 106 ~ 117: Calcite only supports timestamp type as period type, but Flink supports both * Timestamp and TimestampLtz. Should be removed once calcite support TimestampLtz as period type. */ public class LogicalSnapshot extends Snapshot { // ~ Constructors ----------------------------------------------------------- + /** + * Creates a LogicalSnapshot. + * + * <p>Use {@link #create} unless you know what you're doing. + * + * @param cluster Cluster that this relational expression belongs to + * @param traitSet The traits of this relational expression + * @param hints Hints for this node + * @param input Input relational expression + * @param period Timestamp expression which as the table was at the given time in the past + */ + public LogicalSnapshot( + RelOptCluster cluster, + RelTraitSet traitSet, + List<RelHint> hints, + RelNode input, + RexNode period) { + super(cluster, traitSet, hints, input, period); + } /** * Creates a LogicalSnapshot. @@ -55,12 +78,12 @@ public class LogicalSnapshot extends Snapshot { */ public LogicalSnapshot( RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) { - super(cluster, traitSet, input, period); + super(cluster, traitSet, ImmutableList.of(), input, period); } @Override public Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period) { - return new LogicalSnapshot(getCluster(), traitSet, input, period); + return new LogicalSnapshot(getCluster(), traitSet, hints, input, period); } /** Creates a LogicalSnapshot. */ @@ -93,4 +116,9 @@ public class LogicalSnapshot extends Snapshot { } return litmus.succeed(); } + + @Override + public RelNode withHints(final List<RelHint> hintList) { + return new LogicalSnapshot(getCluster(), traitSet, hintList, input, getPeriod()); + } }
