LadyForest commented on code in PR #23620:
URL: https://github.com/apache/flink/pull/23620#discussion_r1379750950
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -234,4 +232,112 @@ private RelNode visitBiRel(BiRel biRel) {
}
}
}
+
+ /** Resolve the RelNode of the sub query in the node and return a new
node. */
+ public static RelNode resolveSubQuery(RelNode node, Function<RelNode,
RelNode> resolver) {
+ if (node instanceof LogicalProject) {
+ LogicalProject project = (LogicalProject) node;
+ List<RexNode> newProjects = resolveSubQuery(project::getProjects,
resolver);
+ return project.copy(
+ project.getTraitSet(), project.getInput(), newProjects,
project.getRowType());
+
+ } else if (node instanceof LogicalFilter) {
+ LogicalFilter filter = (LogicalFilter) node;
+ RexNode newCondition =
+ resolveSubQuery(
+ () ->
Collections.singletonList(filter.getCondition()),
+ resolver)
+ .get(0);
+ return filter.copy(filter.getTraitSet(), filter.getInput(),
newCondition);
+ } else if (node instanceof LogicalJoin) {
+ LogicalJoin join = (LogicalJoin) node;
+ RexNode newCondition =
+ resolveSubQuery(() ->
Collections.singletonList(join.getCondition()), resolver)
+ .get(0);
+ return join.copy(
+ join.getTraitSet(),
+ newCondition,
+ join.getLeft(),
+ join.getRight(),
+ join.getJoinType(),
+ join.isSemiJoinDone());
+ } else {
+ return node;
+ }
+ }
+
+ /** Resolve the RelNode of the sub query in conditions. */
+ private static List<RexNode> resolveSubQuery(
+ Supplier<List<RexNode>> conditionSupplier, Function<RelNode,
RelNode> resolver) {
+ return conditionSupplier.get().stream()
+ .map(
+ rexNode ->
+ rexNode.accept(
+ new RexShuttle() {
+ @Override
+ public RexNode
visitSubQuery(RexSubQuery subQuery) {
+ RelNode oldRel = subQuery.rel;
+ RelNode newRel =
resolver.apply(oldRel);
+ RexSubQuery newSubQuery =
subQuery.clone(newRel);
+ return
super.visitSubQuery(newSubQuery);
+ }
+ }))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Clear the join hints and lookup hints on some nodes where these hints
should not be attached.
+ */
+ public static RelNode clearJoinLookupHintsOnUnmatchedNodes(RelNode root) {
+ return root.accept(
+ new ClearJoinLookupHintsOnUnmatchedNodesShuttle(
+ root.getCluster().getHintStrategies()));
+ }
+
+ /**
+ * Clear the invalid join hints and lookup hints in the unmatched nodes.
For example, a join
+ * hint may be attached in the Project node at first. After accepting this
shuttle, the join
+ * hint in the Project node will be clear.
+ *
+ * <p>See more at {@code FlinkHintStrategies}.
+ *
+ * <p>Tips, hints about view and alias will not be cleared.
+ */
+ private static class ClearJoinLookupHintsOnUnmatchedNodesShuttle extends
RelHomogeneousShuttle {
Review Comment:
Nit: `ClearJoinLookupHintsOnUnmatchedNodesShuttle` sounds like it is aiming
at the lookup join hint specifically. How about
`ClearJoinHintOnUnmatchedNodesShuttle`?
And I understand it's better not to treat it as an inner class.
And I noticed that the `XXHintsShuttle` and `XXHintShuttle` are
interchangeably used (e.g. `ResetHintsShuttle` and
`ClearJoinLookupHintsOnUnmatchedNodesShuttle` v.s.`JoinHintRelShuttle`,
`CapitalizeJoinHintShuttle` and `ClearJoinHintWithInvalidPropagationShuttle`).
It would be better to unify the naming convention.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -234,4 +232,112 @@ private RelNode visitBiRel(BiRel biRel) {
}
}
}
+
+ /** Resolve the RelNode of the sub query in the node and return a new
node. */
+ public static RelNode resolveSubQuery(RelNode node, Function<RelNode,
RelNode> resolver) {
+ if (node instanceof LogicalProject) {
+ LogicalProject project = (LogicalProject) node;
+ List<RexNode> newProjects = resolveSubQuery(project::getProjects,
resolver);
+ return project.copy(
+ project.getTraitSet(), project.getInput(), newProjects,
project.getRowType());
+
+ } else if (node instanceof LogicalFilter) {
+ LogicalFilter filter = (LogicalFilter) node;
+ RexNode newCondition =
+ resolveSubQuery(
+ () ->
Collections.singletonList(filter.getCondition()),
+ resolver)
+ .get(0);
+ return filter.copy(filter.getTraitSet(), filter.getInput(),
newCondition);
+ } else if (node instanceof LogicalJoin) {
+ LogicalJoin join = (LogicalJoin) node;
+ RexNode newCondition =
+ resolveSubQuery(() ->
Collections.singletonList(join.getCondition()), resolver)
+ .get(0);
+ return join.copy(
+ join.getTraitSet(),
+ newCondition,
+ join.getLeft(),
+ join.getRight(),
+ join.getJoinType(),
+ join.isSemiJoinDone());
+ } else {
+ return node;
+ }
+ }
+
+ /** Resolve the RelNode of the sub query in conditions. */
+ private static List<RexNode> resolveSubQuery(
+ Supplier<List<RexNode>> conditionSupplier, Function<RelNode,
RelNode> resolver) {
+ return conditionSupplier.get().stream()
+ .map(
+ rexNode ->
+ rexNode.accept(
+ new RexShuttle() {
+ @Override
+ public RexNode
visitSubQuery(RexSubQuery subQuery) {
+ RelNode oldRel = subQuery.rel;
+ RelNode newRel =
resolver.apply(oldRel);
+ RexSubQuery newSubQuery =
subQuery.clone(newRel);
+ return
super.visitSubQuery(newSubQuery);
+ }
+ }))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Clear the join hints and lookup hints on some nodes where these hints
should not be attached.
+ */
+ public static RelNode clearJoinLookupHintsOnUnmatchedNodes(RelNode root) {
+ return root.accept(
+ new ClearJoinLookupHintsOnUnmatchedNodesShuttle(
+ root.getCluster().getHintStrategies()));
+ }
+
+ /**
+ * Clear the invalid join hints and lookup hints in the unmatched nodes.
For example, a join
+ * hint may be attached in the Project node at first. After accepting this
shuttle, the join
+ * hint in the Project node will be clear.
Review Comment:
Nit: cleared
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -234,4 +232,112 @@ private RelNode visitBiRel(BiRel biRel) {
}
}
}
+
+ /** Resolve the RelNode of the sub query in the node and return a new
node. */
+ public static RelNode resolveSubQuery(RelNode node, Function<RelNode,
RelNode> resolver) {
+ if (node instanceof LogicalProject) {
+ LogicalProject project = (LogicalProject) node;
+ List<RexNode> newProjects = resolveSubQuery(project::getProjects,
resolver);
+ return project.copy(
+ project.getTraitSet(), project.getInput(), newProjects,
project.getRowType());
+
+ } else if (node instanceof LogicalFilter) {
+ LogicalFilter filter = (LogicalFilter) node;
+ RexNode newCondition =
+ resolveSubQuery(
+ () ->
Collections.singletonList(filter.getCondition()),
+ resolver)
+ .get(0);
+ return filter.copy(filter.getTraitSet(), filter.getInput(),
newCondition);
+ } else if (node instanceof LogicalJoin) {
+ LogicalJoin join = (LogicalJoin) node;
+ RexNode newCondition =
+ resolveSubQuery(() ->
Collections.singletonList(join.getCondition()), resolver)
+ .get(0);
+ return join.copy(
+ join.getTraitSet(),
+ newCondition,
+ join.getLeft(),
+ join.getRight(),
+ join.getJoinType(),
+ join.isSemiJoinDone());
+ } else {
+ return node;
+ }
+ }
+
+ /** Resolve the RelNode of the sub query in conditions. */
+ private static List<RexNode> resolveSubQuery(
+ Supplier<List<RexNode>> conditionSupplier, Function<RelNode,
RelNode> resolver) {
+ return conditionSupplier.get().stream()
+ .map(
+ rexNode ->
+ rexNode.accept(
+ new RexShuttle() {
+ @Override
+ public RexNode
visitSubQuery(RexSubQuery subQuery) {
+ RelNode oldRel = subQuery.rel;
+ RelNode newRel =
resolver.apply(oldRel);
+ RexSubQuery newSubQuery =
subQuery.clone(newRel);
+ return
super.visitSubQuery(newSubQuery);
+ }
+ }))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Clear the join hints and lookup hints on some nodes where these hints
should not be attached.
+ */
+ public static RelNode clearJoinLookupHintsOnUnmatchedNodes(RelNode root) {
+ return root.accept(
+ new ClearJoinLookupHintsOnUnmatchedNodesShuttle(
+ root.getCluster().getHintStrategies()));
+ }
+
+ /**
+ * Clear the invalid join hints and lookup hints in the unmatched nodes.
For example, a join
+ * hint may be attached in the Project node at first. After accepting this
shuttle, the join
+ * hint in the Project node will be clear.
+ *
+ * <p>See more at {@code FlinkHintStrategies}.
Review Comment:
```suggestion
* <p>See more at {@link FlinkHintStrategies}.
```
for better navigation.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -234,4 +232,112 @@ private RelNode visitBiRel(BiRel biRel) {
}
}
}
+
+ /** Resolve the RelNode of the sub query in the node and return a new
node. */
+ public static RelNode resolveSubQuery(RelNode node, Function<RelNode,
RelNode> resolver) {
Review Comment:
Nit
```java
public static RelNode resolveSubQuery(RelNode node, Function<RelNode,
RelNode> resolver) {
if (node instanceof LogicalProject) {
LogicalProject project = (LogicalProject) node;
List<RexNode> newProjects =
project.getProjects().stream()
.map(p -> resolveSubQuery(p, resolver))
.collect(Collectors.toList());
return project.copy(
project.getTraitSet(), project.getInput(), newProjects,
project.getRowType());
} else if (node instanceof LogicalFilter) {
LogicalFilter filter = (LogicalFilter) node;
RexNode newCondition = resolveSubQuery(filter.getCondition(),
resolver);
return filter.copy(filter.getTraitSet(), filter.getInput(),
newCondition);
} else if (node instanceof LogicalJoin) {
LogicalJoin join = (LogicalJoin) node;
RexNode newCondition = resolveSubQuery(join.getCondition(),
resolver);
return join.copy(
join.getTraitSet(),
newCondition,
join.getLeft(),
join.getRight(),
join.getJoinType(),
join.isSemiJoinDone());
} else {
return node;
}
}
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java:
##########
@@ -21,7 +21,24 @@
import java.util.List;
import java.util.Locale;
-/** Currently available join strategies and corresponding join hint names. */
+/**
+ * Currently available join strategies and corresponding join hint names.
+ *
+ * <p>The current process about join hint is following:
+ *
+ * <ol>
+ * <li>resolve the propagation about the join hints by calcite
+ * <ol>
+ * <li>propagate the join hints from sink to source and in sub-query
+ * <li>capitalize join hint to let all join hints in upper case
+ * <li>clear the join hints that are propagated into query block
wrongly
+ * <li>clear the join hints that attaches in the unmatched nodes such
as Project
+ * </ol>
+ * <li>validate the join hints and replace the table name in hints with LEFT
or RIGHT
+ * <li>clear the query block alias from sink to source
+ * <li>consume join hints in some rules
+ * </ol>
+ */
Review Comment:
```suggestion
/**
* Currently available join strategies and corresponding join hint names.
*
* <p>The process for handling join hints is as follows:
*
* <ol>
* <li>Resolve join hint propagation:
* <ol>
* <li>The join hints are resolved using Calcite's functionality to
propagate them from the
* sink to the source and within sub-queries
* <li>Capitalize join hints: All join hints are capitalized to
ensure consistency, as they
* are expected to be in uppercase.
* <li>Clear incorrectly propagated join hints: Any join hints that
have been mistakenly
* propagated into the query block are cleared.
* <li>Clear join hints from unmatched nodes: Join hints attached to
unmatched nodes, such
* as {@link org.apache.calcite.rel.core.Project}, are also
cleared.
* </ol>
* <li>Validate and modify join hints: The join hints are validated, and
the table names in the
* hints are replaced with LEFT or RIGHT to indicate the join input
ordinal.
* <li>Clear query block aliases: The query block aliases are cleared from
the sink to the source.
* <li>Consume join hints in applicable rules: Finally, the join hints are
consumed in specific
* rules where they are relevant.
* </ol>
*/
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -188,19 +195,10 @@ public static RelNode capitalizeJoinHints(RelNode root) {
return root.accept(new CapitalizeJoinHintShuttle());
}
- private static class CapitalizeJoinHintShuttle extends RelShuttleImpl {
+ private static class CapitalizeJoinHintShuttle extends JoinHintRelShuttle {
Review Comment:
Nit: I understand it's better not to treat it as an inner class.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -234,4 +232,112 @@ private RelNode visitBiRel(BiRel biRel) {
}
}
}
+
+ /** Resolve the RelNode of the sub query in the node and return a new
node. */
+ public static RelNode resolveSubQuery(RelNode node, Function<RelNode,
RelNode> resolver) {
+ if (node instanceof LogicalProject) {
+ LogicalProject project = (LogicalProject) node;
+ List<RexNode> newProjects = resolveSubQuery(project::getProjects,
resolver);
+ return project.copy(
+ project.getTraitSet(), project.getInput(), newProjects,
project.getRowType());
+
+ } else if (node instanceof LogicalFilter) {
+ LogicalFilter filter = (LogicalFilter) node;
+ RexNode newCondition =
+ resolveSubQuery(
+ () ->
Collections.singletonList(filter.getCondition()),
+ resolver)
+ .get(0);
+ return filter.copy(filter.getTraitSet(), filter.getInput(),
newCondition);
+ } else if (node instanceof LogicalJoin) {
+ LogicalJoin join = (LogicalJoin) node;
+ RexNode newCondition =
+ resolveSubQuery(() ->
Collections.singletonList(join.getCondition()), resolver)
+ .get(0);
+ return join.copy(
+ join.getTraitSet(),
+ newCondition,
+ join.getLeft(),
+ join.getRight(),
+ join.getJoinType(),
+ join.isSemiJoinDone());
+ } else {
+ return node;
+ }
+ }
+
+ /** Resolve the RelNode of the sub query in conditions. */
+ private static List<RexNode> resolveSubQuery(
+ Supplier<List<RexNode>> conditionSupplier, Function<RelNode,
RelNode> resolver) {
+ return conditionSupplier.get().stream()
+ .map(
+ rexNode ->
+ rexNode.accept(
+ new RexShuttle() {
+ @Override
+ public RexNode
visitSubQuery(RexSubQuery subQuery) {
+ RelNode oldRel = subQuery.rel;
+ RelNode newRel =
resolver.apply(oldRel);
+ RexSubQuery newSubQuery =
subQuery.clone(newRel);
+ return
super.visitSubQuery(newSubQuery);
+ }
+ }))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Clear the join hints and lookup hints on some nodes where these hints
should not be attached.
+ */
+ public static RelNode clearJoinLookupHintsOnUnmatchedNodes(RelNode root) {
+ return root.accept(
+ new ClearJoinLookupHintsOnUnmatchedNodesShuttle(
+ root.getCluster().getHintStrategies()));
+ }
+
+ /**
+ * Clear the invalid join hints and lookup hints in the unmatched nodes.
For example, a join
+ * hint may be attached in the Project node at first. After accepting this
shuttle, the join
+ * hint in the Project node will be clear.
+ *
+ * <p>See more at {@code FlinkHintStrategies}.
+ *
+ * <p>Tips, hints about view and alias will not be cleared.
+ */
+ private static class ClearJoinLookupHintsOnUnmatchedNodesShuttle extends
RelHomogeneousShuttle {
+
+ private final HintStrategyTable hintStrategyTable;
+
+ public ClearJoinLookupHintsOnUnmatchedNodesShuttle(HintStrategyTable
hintStrategyTable) {
+ this.hintStrategyTable = hintStrategyTable;
+ }
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (FlinkRelOptUtil.containsSubQuery(other)) {
+ other = resolveSubQuery(other, relNode ->
relNode.accept(this));
+ }
+
+ if (other instanceof Hintable) {
+ List<RelHint> originHints = ((Hintable) other).getHints();
+ List<RelHint> joinLookupHints =
+ originHints.stream()
+ .filter(
+ h ->
+
JoinStrategy.isJoinStrategy(h.hintName)
+ ||
JoinStrategy.isLookupHint(h.hintName))
+ .collect(Collectors.toList());
+
+ List<RelHint> remainHints = new ArrayList<>(originHints);
Review Comment:
Nit: It would be better to add some comments to improve the code
readability. like
```
// First, classify the hints and separate out the join hints.
// Then, use hintStrategyTable#apply to determine whether the join hint can
be attached to the current node.
// If it cannot be attached, it means that the join hint on the current node
needs to be removed.
// As a result, the remaining hints will be attached.
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -234,4 +232,112 @@ private RelNode visitBiRel(BiRel biRel) {
}
}
}
+
+ /** Resolve the RelNode of the sub query in the node and return a new
node. */
+ public static RelNode resolveSubQuery(RelNode node, Function<RelNode,
RelNode> resolver) {
+ if (node instanceof LogicalProject) {
+ LogicalProject project = (LogicalProject) node;
+ List<RexNode> newProjects = resolveSubQuery(project::getProjects,
resolver);
+ return project.copy(
+ project.getTraitSet(), project.getInput(), newProjects,
project.getRowType());
+
+ } else if (node instanceof LogicalFilter) {
+ LogicalFilter filter = (LogicalFilter) node;
+ RexNode newCondition =
+ resolveSubQuery(
+ () ->
Collections.singletonList(filter.getCondition()),
+ resolver)
+ .get(0);
+ return filter.copy(filter.getTraitSet(), filter.getInput(),
newCondition);
+ } else if (node instanceof LogicalJoin) {
+ LogicalJoin join = (LogicalJoin) node;
+ RexNode newCondition =
+ resolveSubQuery(() ->
Collections.singletonList(join.getCondition()), resolver)
+ .get(0);
+ return join.copy(
+ join.getTraitSet(),
+ newCondition,
+ join.getLeft(),
+ join.getRight(),
+ join.getJoinType(),
+ join.isSemiJoinDone());
+ } else {
+ return node;
+ }
+ }
+
+ /** Resolve the RelNode of the sub query in conditions. */
+ private static List<RexNode> resolveSubQuery(
+ Supplier<List<RexNode>> conditionSupplier, Function<RelNode,
RelNode> resolver) {
+ return conditionSupplier.get().stream()
+ .map(
+ rexNode ->
+ rexNode.accept(
+ new RexShuttle() {
+ @Override
+ public RexNode
visitSubQuery(RexSubQuery subQuery) {
+ RelNode oldRel = subQuery.rel;
+ RelNode newRel =
resolver.apply(oldRel);
+ RexSubQuery newSubQuery =
subQuery.clone(newRel);
+ return
super.visitSubQuery(newSubQuery);
+ }
+ }))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Clear the join hints and lookup hints on some nodes where these hints
should not be attached.
+ */
+ public static RelNode clearJoinLookupHintsOnUnmatchedNodes(RelNode root) {
+ return root.accept(
+ new ClearJoinLookupHintsOnUnmatchedNodesShuttle(
+ root.getCluster().getHintStrategies()));
+ }
+
+ /**
+ * Clear the invalid join hints and lookup hints in the unmatched nodes.
For example, a join
+ * hint may be attached in the Project node at first. After accepting this
shuttle, the join
+ * hint in the Project node will be clear.
+ *
+ * <p>See more at {@code FlinkHintStrategies}.
+ *
+ * <p>Tips, hints about view and alias will not be cleared.
+ */
+ private static class ClearJoinLookupHintsOnUnmatchedNodesShuttle extends
RelHomogeneousShuttle {
+
+ private final HintStrategyTable hintStrategyTable;
+
+ public ClearJoinLookupHintsOnUnmatchedNodesShuttle(HintStrategyTable
hintStrategyTable) {
+ this.hintStrategyTable = hintStrategyTable;
+ }
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (FlinkRelOptUtil.containsSubQuery(other)) {
+ other = resolveSubQuery(other, relNode ->
relNode.accept(this));
+ }
+
+ if (other instanceof Hintable) {
+ List<RelHint> originHints = ((Hintable) other).getHints();
+ List<RelHint> joinLookupHints =
+ originHints.stream()
+ .filter(
+ h ->
+
JoinStrategy.isJoinStrategy(h.hintName)
+ ||
JoinStrategy.isLookupHint(h.hintName))
Review Comment:
I think `JoinStrategy.isJoinStrategy(h.hintName)` is good enough to check.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -234,4 +232,112 @@ private RelNode visitBiRel(BiRel biRel) {
}
}
}
+
+ /** Resolve the RelNode of the sub query in the node and return a new
node. */
+ public static RelNode resolveSubQuery(RelNode node, Function<RelNode,
RelNode> resolver) {
+ if (node instanceof LogicalProject) {
+ LogicalProject project = (LogicalProject) node;
+ List<RexNode> newProjects = resolveSubQuery(project::getProjects,
resolver);
+ return project.copy(
+ project.getTraitSet(), project.getInput(), newProjects,
project.getRowType());
+
+ } else if (node instanceof LogicalFilter) {
+ LogicalFilter filter = (LogicalFilter) node;
+ RexNode newCondition =
+ resolveSubQuery(
+ () ->
Collections.singletonList(filter.getCondition()),
+ resolver)
+ .get(0);
+ return filter.copy(filter.getTraitSet(), filter.getInput(),
newCondition);
+ } else if (node instanceof LogicalJoin) {
+ LogicalJoin join = (LogicalJoin) node;
+ RexNode newCondition =
+ resolveSubQuery(() ->
Collections.singletonList(join.getCondition()), resolver)
+ .get(0);
+ return join.copy(
+ join.getTraitSet(),
+ newCondition,
+ join.getLeft(),
+ join.getRight(),
+ join.getJoinType(),
+ join.isSemiJoinDone());
+ } else {
+ return node;
+ }
+ }
+
+ /** Resolve the RelNode of the sub query in conditions. */
+ private static List<RexNode> resolveSubQuery(
+ Supplier<List<RexNode>> conditionSupplier, Function<RelNode,
RelNode> resolver) {
+ return conditionSupplier.get().stream()
+ .map(
+ rexNode ->
+ rexNode.accept(
+ new RexShuttle() {
+ @Override
+ public RexNode
visitSubQuery(RexSubQuery subQuery) {
+ RelNode oldRel = subQuery.rel;
+ RelNode newRel =
resolver.apply(oldRel);
+ RexSubQuery newSubQuery =
subQuery.clone(newRel);
+ return
super.visitSubQuery(newSubQuery);
Review Comment:
I think we can add a reference check to avoid unnecessary clone.
```java
private static RexNode resolveSubQuery(RexNode origin, Function<RelNode,
RelNode> resolver) {
return origin.accept(
new RexShuttle() {
@Override
public RexNode visitSubQuery(RexSubQuery subQuery) {
RelNode oldRel = subQuery.rel;
RelNode newRel = resolver.apply(oldRel);
if (newRel != oldRel) {
return
super.visitSubQuery(subQuery.clone(newRel));
}
return subQuery;
}
});
}
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java:
##########
@@ -216,18 +213,12 @@ public static RelNode decorrelateQuery(RelNode rootRel,
RelBuilder relBuilder) {
newRootRel = decorrelator.decorrelate(newRootRel);
}
- // Re-propagate the hints.
- newRootRel = RelOptUtil.propagateRelHints(newRootRel, true);
-
// ----- FLINK MODIFICATION BEGIN -----
+ // REASON: hints are already parsed and validated before optimizing,
so should not
+ // re-propagate again here
- // replace all join hints with upper case
- newRootRel = FlinkHints.capitalizeJoinHints(newRootRel);
-
- // clear join hints which are propagated into wrong query block
- // The hint QueryBlockAlias will be added when building a RelNode tree
before. It is used to
- // distinguish the query block in the SQL.
- newRootRel = newRootRel.accept(new
ClearJoinHintWithInvalidPropagationShuttle());
+ // Re-propagate the hints.
+ // newRootRel = RelOptUtil.propagateRelHints(newRootRel, true);
Review Comment:
Nit: since the modification is not needed anymore, can we remove this
section?
##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -650,10 +651,11 @@ public RelRoot convertQuery(SqlNode query, final boolean
needsValidation, final
result = result.accept(new NestedJsonFunctionRelRewriter());
}
- // propagate the hints.
- result = RelOptUtil.propagateRelHints(result, false);
-
// ----- FLINK MODIFICATION BEGIN -----
+ // propagate the hints.
+ // 'FlinkRelOptUtil.propagateRelHints' will also propagate hints not
only in the whole rel
Review Comment:
Nit: `The method FlinkRelOptUtil#propagateRelHints not only finds and
propagates hints throughout the entire rel tree but also within subqueries.`
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala:
##########
@@ -524,4 +530,221 @@ object FlinkRelOptUtil {
}
}
}
+
+ // ----- The following is mainly copied from RelOptUtil -----
+ // ----- Copied Line: 537 ~ 743 -----
+ // ----- Modified Line: 642 ~ 664 -----
+ /**
+ * Propagates the relational expression hints from root node to leaf node.
+ *
+ * @param rel
+ * The relational expression
+ * @param reset
+ * Flag saying if to reset the existing hints before the propagation
+ * @return
+ * New relational expression with hints propagated
+ */
+ def propagateRelHints(rel: RelNode, reset: Boolean): RelNode = {
+ val node = if (reset) {
+ rel.accept(new ResetHintsShuttle)
+ } else {
+ rel
+ }
+ val shuttle = new
RelHintPropagateShuttle(node.getCluster.getHintStrategies)
+ node.accept(shuttle)
+ }
+
+ /**
+ * A [[RelShuttle]] which resets all the hints of a relational expression to
what they are
+ * originally like.
+ *
+ * <p>This would trigger a reverse transformation of what
[[RelHintPropagateShuttle]] does.
+ *
+ * <p>Transformation rules:
+ *
+ * <ul> <li>Project: remove the hints that have non-empty inherit path
(which means the hint was
+ * not originally declared from it); <li>Aggregate: remove the hints that
have non-empty inherit
+ * path; <li>Join: remove all the hints; <li>TableScan: remove the hints
that have non-empty
+ * inherit path. </ul>
+ */
+ private class ResetHintsShuttle extends RelHomogeneousShuttle {
+ override def visit(node: RelNode): RelNode = {
+ var finalNode = visitChildren(node)
+ if (node.isInstanceOf[Hintable]) {
+ finalNode =
ResetHintsShuttle.resetHints(finalNode.asInstanceOf[Hintable])
+ }
+ finalNode
+ }
+ }
+
+ private object ResetHintsShuttle {
+ private def resetHints(hintable: Hintable): RelNode = if
(hintable.getHints.size > 0) {
+ val resetHints: util.List[RelHint] = hintable.getHints
+ .filter((hint: RelHint) => hint.inheritPath.size == 0)
+ .toList
+ hintable.withHints(resetHints)
+ } else {
+ hintable.asInstanceOf[RelNode]
+ }
+ }
+
+ /**
+ * A [[RelShuttle]] which propagates all the hints of relational expression
to their children
+ * nodes.
+ *
+ * <p>Given a plan:
+ *
+ * {{{
+ * Filter (Hint1)
+ * |
+ * Join
+ * / \
+ * Scan Project (Hint2)
+ * |
+ * Scan2
+ * }}}
+ *
+ * <p>Every hint has a [[inheritPath]] (integers list) which records its
propagate path, number
+ * `0` represents the hint is propagated from the first(left) child, number
`1` represents the
+ * hint is propagated from the second(right) child, so the plan would have
hints path as follows
+ * (assumes each hint can be propagated to all child nodes):
+ *
+ * <ul> <li>Filter would have hints {Hint1[]}</li> <li>Join would have hints
{Hint1[0]}</li>
+ * <li>Scan would have hints {Hint1[0, 0]}</li> <li>Project would have hints
{Hint1[0,1],
+ * Hint2[]}</li> <li>Scan2 would have hints {[Hint1[0, 1, 0], Hint2[0]}</li>
</ul>
+ */
+ private class RelHintPropagateShuttle private[plan] (
+ /** The hint strategies to decide if a hint should be attached to a
relational expression. */
+ val hintStrategies: HintStrategyTable)
+ extends RelHomogeneousShuttle {
+
+ /** Stack recording the hints and its current inheritPath. */
+ final private val inheritPaths =
+ new util.ArrayDeque[Pair[util.List[RelHint], util.Deque[Integer]]]
+
+ /** Visits a particular child of a parent. */
+ override protected def visitChild(parent: RelNode, i: Int, child:
RelNode): RelNode = {
+ inheritPaths.forEach(
+ (inheritPath: Pair[util.List[RelHint], util.Deque[Integer]]) =>
inheritPath.right.push(i))
+ try {
+ val child2 = child.accept(this)
+ if (child2 ne child) {
+ val newInputs = new util.ArrayList[RelNode](parent.getInputs)
+ newInputs.set(i, child2)
+ return parent.copy(parent.getTraitSet, newInputs)
+ }
+ parent
+ } finally
+ inheritPaths.forEach(
+ (inheritPath: Pair[util.List[RelHint], util.Deque[Integer]]) =>
inheritPath.right.pop)
+ }
+
+ // FLINK MODIFICATION BEGIN
+ // let hints propagating in sub query
+
+ override def visit(other: RelNode): RelNode = {
+ val node = tryToPropagateHintsInSubQuery(other)
+ if (node.isInstanceOf[Hintable]) {
+ visitHintable(node)
+ } else {
+ visitChildren(node)
+ }
+ }
+
+ private def tryToPropagateHintsInSubQuery(node: RelNode): RelNode = {
+ if (containsSubQuery(node)) {
+ FlinkHints.resolveSubQuery(
+ node,
+ relNode => FlinkRelOptUtil.propagateRelHints(relNode, reset = false))
Review Comment:
I think `relNode => relNode.accept(this)` is good enough.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]