This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 271266a9cd5 [FLINK-38731][table] Add support for MULTI_JOIN hint
271266a9cd5 is described below
commit 271266a9cd5e4b5d3d564c956cfcda76cb60fa36
Author: Gustavo de Morais <[email protected]>
AuthorDate: Wed Nov 26 11:13:00 2025 +0000
[FLINK-38731][table] Add support for MULTI_JOIN hint
---
docs/content/docs/dev/table/sql/queries/hints.md | 43 ++++
docs/content/docs/dev/table/tuning.md | 34 ++-
.../table/planner/hint/FlinkHintStrategies.java | 5 +
.../flink/table/planner/hint/JoinStrategy.java | 11 +-
.../plan/rules/logical/JoinToMultiJoinRule.java | 40 +++-
.../plan/optimize/program/FlinkStreamProgram.scala | 30 ++-
.../nodes/exec/stream/MultiJoinSemanticTests.java | 3 +-
.../nodes/exec/stream/MultiJoinTestPrograms.java | 30 +++
.../planner/plan/stream/sql/MultiJoinTest.java | 127 +++++++++++
.../plan/hints/batch/BroadcastJoinHintTest.xml | 4 +-
.../plan/hints/batch/NestLoopJoinHintTest.xml | 4 +-
.../plan/hints/batch/ShuffleHashJoinHintTest.xml | 4 +-
.../plan/hints/batch/ShuffleMergeJoinHintTest.xml | 4 +-
.../optimize/ClearQueryBlockAliasResolverTest.xml | 4 +-
.../plan/optimize/QueryHintsResolverTest.xml | 4 +-
.../planner/plan/stream/sql/MultiJoinTest.xml | 250 +++++++++++++++++++++
16 files changed, 558 insertions(+), 39 deletions(-)
diff --git a/docs/content/docs/dev/table/sql/queries/hints.md
b/docs/content/docs/dev/table/sql/queries/hints.md
index e60d6dbc6ea..3cb34267bcc 100644
--- a/docs/content/docs/dev/table/sql/queries/hints.md
+++ b/docs/content/docs/dev/table/sql/queries/hints.md
@@ -279,6 +279,49 @@ SELECT /*+ NEST_LOOP(t1) */ * FROM t1 JOIN t2 ON t1.id =
t2.id;
SELECT /*+ NEST_LOOP(t1, t3) */ * FROM t1 JOIN t2 ON t1.id = t2.id JOIN t3 ON
t1.id = t3.id;
```
+#### MULTI_JOIN
+
+{{< label Streaming >}}
+
+`MULTI_JOIN` suggests that Flink uses the `MultiJoin operator` to process
multiple regular joins simultaneously. This type of join hint is recommended
when you have multiple joins that share at least one common join key and
experience large intermediate state or record amplification. The MultiJoin
operator eliminates intermediate state by processing joins across various input
streams simultaneously, which can significantly reduce state size and improve
performance in some cases.
+
+For more details on the MultiJoin operator, including when to use it and
configuration options, see [Multiple Regular Joins]({{< ref
"docs/dev/table/tuning" >}}#multiple-regular-joins).
+
+{{< hint info >}}
+Note:
+- The MULTI_JOIN hint can specify table names or table aliases. If a table has
an alias, the hint must use the alias name.
+- At least one key must be shared between the join conditions for the
MultiJoin operator to be applied.
+- When specified, the MULTI_JOIN hint applies to the tables listed in the hint
within the current query block.
+{{< /hint >}}
+
+##### Examples
+
+```sql
+CREATE TABLE t1 (id BIGINT, name STRING, age INT) WITH (...);
+CREATE TABLE t2 (id BIGINT, name STRING, age INT) WITH (...);
+CREATE TABLE t3 (id BIGINT, name STRING, age INT) WITH (...);
+
+-- Flink will use the MultiJoin operator for the three-way join.
+SELECT /*+ MULTI_JOIN(t1, t2, t3) */ * FROM t1
+JOIN t2 ON t1.id = t2.id
+JOIN t3 ON t1.id = t3.id;
+
+-- Using table names instead of aliases.
+SELECT /*+ MULTI_JOIN(Users, Orders, Payments) */ * FROM Users
+INNER JOIN Orders ON Users.user_id = Orders.user_id
+INNER JOIN Payments ON Users.user_id = Payments.user_id;
+
+-- Partial match: only t1 and t2 will use MultiJoin, t3 will use regular join.
+SELECT /*+ MULTI_JOIN(t1, t2) */ * FROM t1
+JOIN t2 ON t1.id = t2.id
+JOIN t3 ON t1.id = t3.id;
+
+-- Combining MULTI_JOIN with STATE_TTL hint.
+SELECT /*+ MULTI_JOIN(t1, t2, t3), STATE_TTL('t1'='1d', 't2'='2d', 't3'='12h')
*/ * FROM t1
+JOIN t2 ON t1.id = t2.id
+JOIN t3 ON t1.id = t3.id;
+```
+
#### LOOKUP
{{< label Streaming >}}
diff --git a/docs/content/docs/dev/table/tuning.md
b/docs/content/docs/dev/table/tuning.md
index b600dd7f908..af099b6395d 100644
--- a/docs/content/docs/dev/table/tuning.md
+++ b/docs/content/docs/dev/table/tuning.md
@@ -307,9 +307,9 @@ MiniBatch optimization is disabled by default for regular
join. In order to enab
{{< label Streaming >}}
-Streaming Flink jobs with multiple non-temporal regular joins often experience
operational instability and performance degradation due to large state sizes.
This is often because the intermediate state created by a chain of joins is
much larger than the input state itself. In Flink 2.1, we introduce a new
multi-join operator, an optimization designed to significantly reduce state
size and improve performance for join pipelines that involve record
amplification and large intermediate stat [...]
+Streaming Flink jobs with multiple non-temporal regular joins often experience
operational instability and performance degradation due to large state sizes.
This is often because the intermediate state created by a chain of joins is
much larger than the input state itself. In Flink 2.1, we introduce a new
multi-join operator, an optimization designed to significantly reduce state
size and improve performance for join pipelines that involve record
amplification and large intermediate stat [...]
-In most joins, a significant portion of processing time is spent fetching
records from the state. The efficiency of the MultiJoin operator largely
depends on the size of this intermediate state. In a common scenario where a
pipeline experiences record amplification—meaning each join produces more data
and records than the previous one, the MultiJoin operator is more efficient.
This is because it keeps the state on which the operator interacts much
smaller, leading to a more stable operat [...]
+In most joins, a significant portion of processing time is spent fetching
records from the state. The efficiency of the MultiJoin operator largely
depends on the size of this intermediate state and the selectivity of the
common join key(s). In a common scenario where a pipeline experiences record
amplification—meaning each join produces more data and records than the
previous one, the MultiJoin operator is more efficient. This is because it
keeps the state on which the operator interacts [...]
### The MultiJoin Operator
The main benefits of the MultiJoin operator are:
@@ -318,21 +318,39 @@ The main benefits of the MultiJoin operator are:
2) Improved performance for chained joins with record amplification.
3) Improved stability: linear state growth with amount of records processed,
instead of polynomial growth with binary joins.
-Also, pipelines with MultiJoin instead of binary joins usually have faster
initialization and recovery times due to smaller state and fewer amount of
nodes.
+Also, pipelines with MultiJoin instead of binary joins usually have faster
initialization and recovery times due to smaller state and fewer nodes.
### When to enable the MultiJoin?
-If your job has multiple joins that share at least one common join key, and
you observe that the intermediate state in the intermediate joins is larger
than the inputs sources, consider enabling the MultiJoin operator.
+If your job has multiple joins that share at least one common join key, and
you observe that the intermediate state in the intermediate joins is larger
than the input sources, consider enabling the MultiJoin operator.
+
+Recommended use cases:
+- The common join key(s) have a high selectivity (the number of records per
key is small)
+- Statement with several chained joins and considerable intermediate state
+- No considerable data skew on the common join key(s)
+- Joins are generating large state (state 50+ GB)
+
+If your common join key(s) exhibit low selectivity (i.e., a high number of
rows sharing the same key value), the MultiJoin operator's required
recomputation of the intermediate state can severely impact performance. In
such scenarios, binary joins are recommended, as these will partition the data
using all join keys.
### How to enable the MultiJoin?
-To enable this optimization, set the following configuration
+To enable this optimization globally for all eligible joins, set the following
configuration:
```sql
SET 'table.optimizer.multi-join.enabled' = 'true';
```
-Important: This is currently in an experimental state - there are open
optimizations and breaking changes might be implemented in this version. We
currently support only streaming INNER/LEFT joins. Support for RIGHT joins will
be added soon. Due to records partitioning, you need at least one key that is
shared between the join conditions, see:
+Alternatively, you can enable the MultiJoin operator for specific tables using
the `MULTI_JOIN` hint:
+
+```sql
+SELECT /*+ MULTI_JOIN(t1, t2, t3) */ * FROM t1
+JOIN t2 ON t1.id = t2.id
+JOIN t3 ON t1.id = t3.id;
+```
+
+The hint approach allows you to selectively apply the MultiJoin optimization
to specific query blocks without enabling it globally. You can specify either
table names or table aliases in the hint. For more details on the MULTI_JOIN
hint, see [Join Hints]({{< ref "docs/dev/table/sql/queries/hints"
>}}#multi_join).
+
+Important: This is currently in an experimental state - optimizations and
breaking changes might be implemented. We currently support only streaming
INNER/LEFT joins. Due to records partitioning, you need at least one key that
is shared between the join conditions, see:
- Supported: A JOIN B ON A.key = B.key JOIN C ON A.key = C.key (Partition by
key)
- Supported: A JOIN B ON A.key = B.key JOIN C ON B.key = C.key (Partition by
key via transitivity)
@@ -349,9 +367,9 @@ For this 10-way join above, involving record amplification,
we've observed signi
- Performance: 2x to over 100x+ increase in processed records when both at
100% busyness.
- State Size: 3x to over 1000x+ smaller as intermediate state grows.
-The total state is always smaller with the MultiJoin operator. In this case,
the performance is initially the same, but as the intermediate state grows, the
performance of binary joins degrade and the multi join remains stable and
outperforms.
+The total state is always smaller with the MultiJoin operator. In this case,
the performance is initially the same, but as the intermediate state grows, the
performance of binary joins degrades and the multi join remains stable and
outperforms.
-This general benchmark for the 10-way join was run with the following
configuration: 10 upsert kafka topics, 10 parallelism, 1 record per second per
topic. We used rocksdb with unaligned checkpoints and with incremental
checkpoints. Each job ran in one TaskManager containing 8GB process memory, 1GB
off-heap memory and 20% network memory. The JobManager had 4GB process memory.
The host machine contained a M1 processor chip, 32GB RAM and 1TB SSD. The sink
uses a blackhole connector so we o [...]
+This general benchmark for the 10-way join was run with the following
configuration: 1 record per tenant_id (high selectivity), 10 upsert kafka
topics, 10 parallelism, 1 record per second per topic. We used rocksdb with
unaligned checkpoints and with incremental checkpoints. Each job ran in one
TaskManager containing 8GB process memory, 1GB off-heap memory and 20% network
memory. The JobManager had 4GB process memory. The host machine contained a M1
processor chip, 32GB RAM and 1TB SSD. [...]
```sql
INSERT INTO JoinResultsMJ
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
index c83e0affef4..33e6d4346dd 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
@@ -121,6 +121,11 @@ public abstract class FlinkHintStrategies {
HintPredicates.CORRELATE,
HintPredicates.JOIN))
.optionChecker(LOOKUP_NON_EMPTY_KV_OPTION_CHECKER)
.build())
+ .hintStrategy(
+ JoinStrategy.MULTI_JOIN.getJoinHintName(),
+ HintStrategy.builder(HintPredicates.JOIN)
+ .optionChecker(NON_EMPTY_LIST_OPTION_CHECKER)
+ .build())
.hintStrategy(
StateTtlHint.STATE_TTL.getHintName(),
HintStrategy.builder(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
index c7aff224590..0ded47cb222 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
@@ -48,7 +48,14 @@ public enum JoinStrategy {
NEST_LOOP("NEST_LOOP"),
/** Instructs the optimizer to use lookup join strategy. Only accept
key-value hint options. */
- LOOKUP("LOOKUP");
+ LOOKUP("LOOKUP"),
+
+ /**
+ * Instructs the optimizer to use multi-way join strategy for streaming
queries. This hint
+ * allows specifying multiple tables to be joined together in a single
{@link
+ *
org.apache.flink.table.runtime.operators.join.stream.StreamingMultiJoinOperator}.
+ */
+ MULTI_JOIN("MULTI_JOIN");
private final String joinHintName;
@@ -83,6 +90,8 @@ public enum JoinStrategy {
return options.size() > 0;
case LOOKUP:
return null == options || options.size() == 0;
+ case MULTI_JOIN:
+ return options.size() > 0;
}
return false;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java
index 81f27d29788..1712176f1f7 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java
@@ -18,12 +18,16 @@
package org.apache.flink.table.planner.plan.rules.logical;
+import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.hint.JoinStrategy;
import org.apache.flink.table.planner.hint.StateTtlHint;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMultiJoin;
import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
import
org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor;
import
org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor;
import org.apache.flink.table.types.logical.RowType;
@@ -170,7 +174,41 @@ public class JoinToMultiJoinRule extends
RelRule<JoinToMultiJoinRule.Config>
return false;
}
- return origJoin.getJoinType().projectsRight();
+ if (!origJoin.getJoinType().projectsRight()) {
+ return false;
+ }
+
+ // Enable multi-join if either config is enabled OR MULTI_JOIN hint is
present
+ return isEnabledViaConfig(origJoin) || hasMultiJoinHint(origJoin);
+ }
+
+ /**
+ * Checks if multi-join optimization is enabled via configuration.
+ *
+ * @param join the join node
+ * @return true if TABLE_OPTIMIZER_MULTI_JOIN_ENABLED is set to true
+ */
+ private boolean isEnabledViaConfig(Join join) {
+ final TableConfig tableConfig = ShortcutUtils.unwrapTableConfig(join);
+ return
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED);
+ }
+
+ /**
+ * Checks if the MULTI_JOIN hint is present on the join node.
+ *
+ * <p>Note: By the time this rule sees the join, the QueryHintsResolver
has already validated
+ * the hint. If the hint is present with valid options, it means both
sides of this join were
+ * mentioned in the original hint and have been validated.
+ *
+ * @param join the join node
+ * @return true if MULTI_JOIN hint is present and valid
+ */
+ private boolean hasMultiJoinHint(Join join) {
+ return join.getHints().stream()
+ .anyMatch(
+ hint ->
+
JoinStrategy.MULTI_JOIN.getJoinHintName().equals(hint.hintName)
+ && !hint.listOptions.isEmpty());
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
index e0e2481d5a2..8ec32c242fa 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
@@ -233,22 +233,20 @@ object FlinkStreamProgram {
}
// multi-join
- if
(tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED)) {
- chainedProgram.addLast(
- MULTI_JOIN,
- FlinkGroupProgramBuilder
- .newBuilder[StreamOptimizeContext]
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder
-
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.MULTI_JOIN_RULES)
- .build(),
- "merge binary regular joins into MultiJoin"
- )
- .build()
- )
- }
+ chainedProgram.addLast(
+ MULTI_JOIN,
+ FlinkGroupProgramBuilder
+ .newBuilder[StreamOptimizeContext]
+ .addProgram(
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkStreamRuleSets.MULTI_JOIN_RULES)
+ .build(),
+ "merge binary regular joins into MultiJoin"
+ )
+ .build()
+ )
// project rewrite
chainedProgram.addLast(
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
index 20d36a0dfda..706e512a3de 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
@@ -54,6 +54,7 @@ public class MultiJoinSemanticTests extends SemanticTestBase {
MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_IN_CONDITIONS_MATERIALIZATION,
MultiJoinTestPrograms.MULTI_JOIN_TWO_WAY_INNER_JOIN_WITH_WHERE_IN,
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_MULTI_KEY_TYPES,
-
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_MIXED_JOIN_MULTI_KEY_TYPES_SHUFFLED);
+
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_MIXED_JOIN_MULTI_KEY_TYPES_SHUFFLED,
+
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_HINT);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
index c19a5a28092..9e3e7fa7631 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
@@ -1823,4 +1823,34 @@ public class MultiJoinTestPrograms {
+ "LEFT JOIN Shipments4K AS S ON U.k3 =
S.k3 AND U.k2 > 150 AND U.k4 = S.k4 "
+ "WHERE U.k2 > 50")
.build();
+
+ public static final TableTestProgram
MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_HINT =
+ TableTestProgram.of(
+ "three-way-inner-join-with-hint",
+ "three way inner join using MULTI_JOIN hint")
+
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false)
+ .setupTableSource(USERS_SOURCE)
+ .setupTableSource(ORDERS_SOURCE)
+ .setupTableSource(PAYMENTS_SOURCE)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(
+ "user_id STRING",
+ "name STRING",
+ "order_id STRING",
+ "payment_id STRING")
+ .consumedValues(
+ "+I[1, Gus, order1, payment1]",
+ "+I[2, Bob, order2, payment2]",
+ "+I[2, Bob, order3, payment2]",
+ "+I[1, Gus, order1, payment3]")
+ .testMaterializedData()
+ .build())
+ .runSql(
+ "INSERT INTO sink "
+ + "SELECT /*+ MULTI_JOIN(u, o, p) */
u.user_id, u.name, o.order_id, p.payment_id "
+ + "FROM Users u "
+ + "INNER JOIN Orders o ON u.user_id =
o.user_id "
+ + "INNER JOIN Payments p ON u.user_id =
p.user_id")
+ .build();
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
index 81cc1a10ef9..342a6913df3 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
@@ -271,6 +271,133 @@ public class MultiJoinTest extends TableTestBase {
+ " ON u.user_id = p.user_id");
}
+ @Test
+ void testThreeWayJoinWithMultiJoinHint() {
+ // Disable config so the MultiJoin is enabled by hints
+ util.getTableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+ util.verifyRelPlan(
+ "SELECT /*+ MULTI_JOIN(u, o, p) */u.user_id, u.name,
o.order_id, p.payment_id "
+ + "FROM Users u "
+ + "INNER JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id");
+ }
+
+ @Test
+ void testMultiJoinHintCombinedWithStateTtlHint() {
+ util.getTableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+ util.verifyRelPlan(
+ "SELECT /*+ MULTI_JOIN(u, o, p), STATE_TTL(u='1d', o='2d',
p='1h') */u.user_id, u.name, o.order_id, p.payment_id "
+ + "FROM Users u "
+ + "INNER JOIN Orders o ON u.user_id = o.user_id "
+ + "INNER JOIN Payments p ON u.user_id = p.user_id");
+ }
+
+ @Test
+ void testMultiJoinPartialHintCombinedWithStateTtlHint() {
+ util.getTableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+ util.verifyRelPlan(
+ "SELECT /*+ MULTI_JOIN(u, o), STATE_TTL(u='1d', o='2d',
p='1h') */u.user_id, u.name, o.order_id, p.payment_id "
+ + "FROM Users u "
+ + "INNER JOIN Orders o ON u.user_id = o.user_id "
+ + "INNER JOIN Payments p ON u.user_id = p.user_id");
+ }
+
+ @Test
+ void testMultiJoinHintWithTableNames() {
+ util.getTableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+ util.verifyRelPlan(
+ "SELECT /*+ MULTI_JOIN(Users, Orders, Payments)
*/Users.user_id, Users.name, Orders.order_id, Payments.payment_id "
+ + "FROM Users "
+ + "INNER JOIN Orders ON Users.user_id = Orders.user_id
"
+ + "INNER JOIN Payments ON Users.user_id =
Payments.user_id");
+ }
+
+ @Test
+ void testMultiJoinHintPartialMatch() {
+ // First join (u, o) should become MultiJoin with hint
+ // When the result joins with s, that's not in the hint so regular
join should be used
+ util.getTableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+ util.verifyRelPlan(
+ "SELECT /*+ MULTI_JOIN(u, o) */u.user_id, u.name, o.order_id,
s.location "
+ + "FROM Users u "
+ + "INNER JOIN Orders o ON u.user_id = o.user_id "
+ + "INNER JOIN Shipments s ON u.user_id = s.user_id");
+ }
+
+ @Test
+ void testMultiJoinHintWithMixedNamesAndAliases() {
+ // Hint uses table name for Users, but aliases for others - matching
should work
+ util.getTableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+ util.verifyRelPlan(
+ "SELECT /*+ MULTI_JOIN(Users, o, p) */ Users.user_id,
Users.name, o.order_id, p.payment_id "
+ + "FROM Users "
+ + "INNER JOIN Orders o ON Users.user_id = o.user_id "
+ + "INNER JOIN Payments p ON Users.user_id =
p.user_id");
+ }
+
+ @Test
+ void testChainedMultiJoinHints() {
+ // Tests two separate MULTI_JOIN hints in the same query
+ // Inner subquery has MULTI_JOIN(o, p, s) and outer query joins with
Users
+ // This verifies that multiple MULTI_JOIN hints can be used at
different query levels
+ util.getTableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+ util.verifyRelPlan(
+ "SELECT /*+ MULTI_JOIN(u, subq) */ u.user_id, u.name,
subq.order_id, subq.payment_id, subq.location "
+ + "FROM Users u "
+ + "INNER JOIN ("
+ + " SELECT /*+ MULTI_JOIN(o, p, s) */ o.order_id,
o.user_id u1, p.payment_id, p.user_id u2, s.location, s.user_id u3"
+ + " FROM Orders o "
+ + " INNER JOIN Payments p ON o.user_id = p.user_id "
+ + " INNER JOIN Shipments s ON p.user_id = s.user_id"
+ + ") subq ON u.user_id = subq.u1");
+ }
+
+ @Test
+ void testMultipleMultiJoinHintsInDifferentBranches() {
+ // Tests multiple MULTI_JOIN hints where two separate multi-joins are
joined together
+ // Left side: MULTI_JOIN(u, o)
+ // Right side: MULTI_JOIN(p, s)
+ // Then these two multi-joins are joined together
+ util.getTableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+ util.verifyRelPlan(
+ "SELECT left_side.user_id, left_side.order_id,
right_side.payment_id, right_side.location "
+ + "FROM ("
+ + " SELECT /*+ MULTI_JOIN(u, o) */ u.user_id,
o.order_id, o.user_id ouid "
+ + " FROM Users u "
+ + " INNER JOIN Orders o ON u.user_id = o.user_id"
+ + ") left_side "
+ + "INNER JOIN ("
+ + " SELECT /*+ MULTI_JOIN(p, s) */ p.payment_id,
p.user_id, s.location, s.user_id suid "
+ + " FROM Payments p "
+ + " INNER JOIN Shipments s ON p.user_id = s.user_id"
+ + ") right_side "
+ + "ON left_side.user_id = right_side.user_id");
+ }
+
@Test
void testThreeWayLeftOuterJoinExecPlan() {
util.verifyExecPlan(
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
index eadacfb9c55..4b24a4f7a39 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
@@ -1667,12 +1667,12 @@ HashAggregate(isMerge=[true], groupBy=[a1, b1, a2, b2],
select=[a1, b1, a2, b2])
</TestCase>
<TestCase name="testMultiJoinHints">
<Resource name="sql">
- <![CDATA[select /*+ BROADCAST(T1),
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 =
T2.a2]]>
+ <![CDATA[select /*+ BROADCAST(T1),
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1
join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST
inheritPath:[0] options:[T1]][SHUFFLE_HASH inheritPath:[0]
options:[T1]][SHUFFLE_MERGE inheritPath:[0] options:[T1]][NEST_LOOP
inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST
inheritPath:[0] options:[T1]][SHUFFLE_HASH inheritPath:[0]
options:[T1]][SHUFFLE_MERGE inheritPath:[0] options:[T1]][NEST_LOOP
inheritPath:[0] options:[T1]][MULTI_JOIN inheritPath:[0] options:[T1]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
index 50a1a8bc7fe..d280cd87b0d 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
@@ -1663,12 +1663,12 @@ HashAggregate(isMerge=[true], groupBy=[a1, b1, a2, b2],
select=[a1, b1, a2, b2])
</TestCase>
<TestCase name="testMultiJoinHints">
<Resource name="sql">
- <![CDATA[select /*+ NEST_LOOP(T1),
BROADCAST(T1),SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1) */* from T1 join T2 on T1.a1 =
T2.a2]]>
+ <![CDATA[select /*+ NEST_LOOP(T1),
BROADCAST(T1),SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),MULTI_JOIN(T1) */* from T1
join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[NEST_LOOP
inheritPath:[0] options:[T1]][BROADCAST inheritPath:[0]
options:[T1]][SHUFFLE_HASH inheritPath:[0] options:[T1]][SHUFFLE_MERGE
inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[NEST_LOOP
inheritPath:[0] options:[T1]][BROADCAST inheritPath:[0]
options:[T1]][SHUFFLE_HASH inheritPath:[0] options:[T1]][SHUFFLE_MERGE
inheritPath:[0] options:[T1]][MULTI_JOIN inheritPath:[0] options:[T1]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
index 63a9af10ea7..c6f3c923360 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
@@ -1702,12 +1702,12 @@ HashAggregate(isMerge=[true], groupBy=[a1, b1, a2, b2],
select=[a1, b1, a2, b2])
</TestCase>
<TestCase name="testMultiJoinHints">
<Resource name="sql">
- <![CDATA[select /*+ SHUFFLE_HASH(T1),
BROADCAST(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 =
T2.a2]]>
+ <![CDATA[select /*+ SHUFFLE_HASH(T1),
BROADCAST(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1 join
T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner],
joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]][BROADCAST
inheritPath:[0] options:[T1]][SHUFFLE_MERGE inheritPath:[0]
options:[T1]][NEST_LOOP inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner],
joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]][BROADCAST
inheritPath:[0] options:[T1]][SHUFFLE_MERGE inheritPath:[0]
options:[T1]][NEST_LOOP inheritPath:[0] options:[T1]][MULTI_JOIN
inheritPath:[0] options:[T1]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
index d8d9f36f4e6..0c12624194c 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
@@ -1702,12 +1702,12 @@ HashAggregate(isMerge=[true], groupBy=[a1, b1, a2, b2],
select=[a1, b1, a2, b2])
</TestCase>
<TestCase name="testMultiJoinHints">
<Resource name="sql">
- <![CDATA[select /*+ SHUFFLE_MERGE(T1),
BROADCAST(T1),SHUFFLE_HASH(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 =
T2.a2]]>
+ <![CDATA[select /*+ SHUFFLE_MERGE(T1),
BROADCAST(T1),SHUFFLE_HASH(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1 join T2
on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner],
joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]][BROADCAST
inheritPath:[0] options:[T1]][SHUFFLE_HASH inheritPath:[0]
options:[T1]][NEST_LOOP inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner],
joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]][BROADCAST
inheritPath:[0] options:[T1]][SHUFFLE_HASH inheritPath:[0]
options:[T1]][NEST_LOOP inheritPath:[0] options:[T1]][MULTI_JOIN
inheritPath:[0] options:[T1]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
index 693f5948a69..75a756dc770 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
@@ -919,12 +919,12 @@ LogicalUnion(all=[false]), rowType=[RecordType(BIGINT a1,
VARCHAR(2147483647) b1
</TestCase>
<TestCase name="testMultiJoinHints">
<Resource name="sql">
- <![CDATA[select /*+ BROADCAST(T1),
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 =
T2.a2]]>
+ <![CDATA[select /*+ BROADCAST(T1),
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1
join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT
a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST
options:[LEFT]][SHUFFLE_HASH options:[LEFT]][SHUFFLE_MERGE
options:[LEFT]][NEST_LOOP options:[LEFT]]]]), rowType=[RecordType(BIGINT a1,
VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST
options:[LEFT]][SHUFFLE_HASH options:[LEFT]][SHUFFLE_MERGE
options:[LEFT]][NEST_LOOP options:[LEFT]][MULTI_JOIN options:[LEFT]]]]),
rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2,
VARCHAR(2147483647) b2)]
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]),
rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]),
rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/QueryHintsResolverTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/QueryHintsResolverTest.xml
index a3a39373d8e..24db2525969 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/QueryHintsResolverTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/QueryHintsResolverTest.xml
@@ -919,12 +919,12 @@ LogicalUnion(all=[false]), rowType=[RecordType(BIGINT a1,
VARCHAR(2147483647) b1
</TestCase>
<TestCase name="testMultiJoinHints">
<Resource name="sql">
- <![CDATA[select /*+ BROADCAST(T1),
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 =
T2.a2]]>
+ <![CDATA[select /*+ BROADCAST(T1),
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1
join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT
a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST
options:[LEFT]][SHUFFLE_HASH options:[LEFT]][SHUFFLE_MERGE
options:[LEFT]][NEST_LOOP options:[LEFT]]]]), rowType=[RecordType(BIGINT a1,
VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST
options:[LEFT]][SHUFFLE_HASH options:[LEFT]][SHUFFLE_MERGE
options:[LEFT]][NEST_LOOP options:[LEFT]][MULTI_JOIN options:[LEFT]]]]),
rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2,
VARCHAR(2147483647) b2)]
:- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1,
VARCHAR(2147483647) b1)]
+- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]]), rowType=[RecordType(BIGINT a2,
VARCHAR(2147483647) b2)]
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index 6c3a455d789..aae1eb85015 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -215,6 +215,43 @@ Calc(select=[user_id, name, order_id, dept_name,
project_name, budget])
+- ChangelogNormalize(key=[project_id], condition=[=(status,
'ACTIVE')])
+- Exchange(distribution=[hash[project_id]])
+- TableSourceScan(table=[[default_catalog, default_database,
Projects, filter=[]]], fields=[project_id, project_name, dept_id, status])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChainedMultiJoinHints">
+ <Resource name="sql">
+ <![CDATA[SELECT /*+ MULTI_JOIN(u, subq) */ u.user_id, u.name,
subq.order_id, subq.payment_id, subq.location FROM Users u INNER JOIN ( SELECT
/*+ MULTI_JOIN(o, p, s) */ o.order_id, o.user_id u1, p.payment_id, p.user_id
u2, s.location, s.user_id u3 FROM Orders o INNER JOIN Payments p ON
o.user_id = p.user_id INNER JOIN Shipments s ON p.user_id = s.user_id) subq
ON u.user_id = subq.u1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$5],
location=[$7])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, subq]]]])
+ :- LogicalTableScan(table=[[default_catalog, default_database, Users]],
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+ +- LogicalProject(order_id=[$0], u1=[$1], payment_id=[$3], u2=[$5],
location=[$6], u3=[$7])
+ +- LogicalJoin(condition=[=($5, $7)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[o, p, s]]]])
+ :- LogicalJoin(condition=[=($1, $5)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[o, p, s]]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database,
Orders]], hints=[[[ALIAS inheritPath:[] options:[o]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
Payments]], hints=[[[ALIAS inheritPath:[] options:[p]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
Shipments]], hints=[[[ALIAS inheritPath:[] options:[s]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id, location])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER],
inputUniqueKeys=[(user_id), noUniqueKey], joinConditions=[=(user_id, u1)],
select=[user_id,name,order_id,u1,payment_id,location],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) u1, VARCHAR(2147483647)
payment_id, VARCHAR(2147483647) location)])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- ChangelogNormalize(key=[user_id])
+ : +- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Users, project=[user_id, name], metadata=[]]], fields=[user_id, name])
+ +- Exchange(distribution=[hash[user_id]])
+ +- Calc(select=[order_id, user_id, payment_id, location])
+ +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER],
inputUniqueKeys=[(order_id), (payment_id), noUniqueKey],
joinConditions=[=(user_id, user_id0), =(user_id0, user_id1)],
select=[order_id,user_id,payment_id,user_id0,location,user_id1],
rowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id,
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id0,
VARCHAR(2147483647) location, VARCHAR(2147483647) user_id1)])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Payments, project=[payment_id, user_id], metadata=[]]], fields=[payment_id,
user_id])
+ +- Exchange(distribution=[hash[user_id]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
Shipments]], fields=[location, user_id])
]]>
</Resource>
</TestCase>
@@ -1324,6 +1361,151 @@ MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER],
inputUniqueKeys=[(user_id)
+- Exchange(distribution=[hash[user_id]])
+- Calc(select=[payment_id, user_id, price], where=[>(price, 100)])
+- TableSourceScan(table=[[default_catalog, default_database,
Payments, filter=[]]], fields=[payment_id, price, user_id])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoinHintCombinedWithStateTtlHint">
+ <Resource name="sql">
+ <![CDATA[SELECT /*+ MULTI_JOIN(u, o, p), STATE_TTL(u='1d', o='2d',
p='1h') */u.user_id, u.name, o.order_id, p.payment_id FROM Users u INNER JOIN
Orders o ON u.user_id = o.user_id INNER JOIN Payments p ON u.user_id =
p.user_id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, o, p]]]],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{p=1h, u=1d, o=2d}]]])
+ :- LogicalJoin(condition=[=($0, $4)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[u, o, p]]]],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{p=1h, u=1d, o=2d}]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, Users]],
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]],
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, Payments]],
hints=[[[ALIAS inheritPath:[] options:[p]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER],
inputUniqueKeys=[(user_id), (order_id), (payment_id)],
stateTtlHints=[[[STATE_TTL options:[1d, 2d, 1h]]]], joinConditions=[=(user_id,
user_id0), =(user_id, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, VARCHAR(2147483647) user_id1)])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, Users,
project=[user_id, name], metadata=[]]], fields=[user_id, name])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, Orders,
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+ +- Exchange(distribution=[hash[user_id]])
+ +- TableSourceScan(table=[[default_catalog, default_database, Payments,
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoinHintPartialMatch">
+ <Resource name="sql">
+ <![CDATA[SELECT /*+ MULTI_JOIN(u, o) */u.user_id, u.name, o.order_id,
s.location FROM Users u INNER JOIN Orders o ON u.user_id = o.user_id INNER JOIN
Shipments s ON u.user_id = s.user_id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], location=[$6])
++- LogicalJoin(condition=[=($0, $7)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, o]]]])
+ :- LogicalJoin(condition=[=($0, $4)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[u, o]]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, Users]],
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]],
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, Shipments]],
hints=[[[ALIAS inheritPath:[] options:[s]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[user_id, name, order_id, location])
++- Join(joinType=[InnerJoin], where=[=(user_id, user_id0)], select=[user_id,
name, order_id, location, user_id0], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- Calc(select=[user_id, name, order_id])
+ : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER],
inputUniqueKeys=[(user_id), (order_id)], joinConditions=[=(user_id, user_id0)],
select=[user_id,name,order_id,user_id0],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
+ : :- Exchange(distribution=[hash[user_id]])
+ : : +- ChangelogNormalize(key=[user_id])
+ : : +- Exchange(distribution=[hash[user_id]])
+ : : +- TableSourceScan(table=[[default_catalog,
default_database, Users, project=[user_id, name], metadata=[]]],
fields=[user_id, name])
+ : +- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+ +- Exchange(distribution=[hash[user_id]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
Shipments]], fields=[location, user_id])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoinHintWithMixedNamesAndAliases">
+ <Resource name="sql">
+ <![CDATA[SELECT /*+ MULTI_JOIN(Users, o, p) */ Users.user_id,
Users.name, o.order_id, p.payment_id FROM Users INNER JOIN Orders o ON
Users.user_id = o.user_id INNER JOIN Payments p ON Users.user_id = p.user_id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[Users, o, p]]]])
+ :- LogicalJoin(condition=[=($0, $4)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[Users, o, p]]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, Users]],
hints=[[[ALIAS inheritPath:[] options:[Users]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]],
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, Payments]],
hints=[[[ALIAS inheritPath:[] options:[p]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER],
inputUniqueKeys=[(user_id), (order_id), (payment_id)],
joinConditions=[=(user_id, user_id0), =(user_id, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, VARCHAR(2147483647) user_id1)])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, Users,
project=[user_id, name], metadata=[]]], fields=[user_id, name])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, Orders,
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+ +- Exchange(distribution=[hash[user_id]])
+ +- TableSourceScan(table=[[default_catalog, default_database, Payments,
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoinHintWithTableNames">
+ <Resource name="sql">
+ <![CDATA[SELECT /*+ MULTI_JOIN(Users, Orders, Payments) */Users.user_id,
Users.name, Orders.order_id, Payments.payment_id FROM Users INNER JOIN Orders
ON Users.user_id = Orders.user_id INNER JOIN Payments ON Users.user_id =
Payments.user_id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[Users, Orders, Payments]]]])
+ :- LogicalJoin(condition=[=($0, $4)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[Users, Orders, Payments]]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, Users]],
hints=[[[ALIAS inheritPath:[] options:[Users]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]],
hints=[[[ALIAS inheritPath:[] options:[Orders]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, Payments]],
hints=[[[ALIAS inheritPath:[] options:[Payments]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER],
inputUniqueKeys=[(user_id), (order_id), (payment_id)],
joinConditions=[=(user_id, user_id0), =(user_id, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, VARCHAR(2147483647) user_id1)])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, Users,
project=[user_id, name], metadata=[]]], fields=[user_id, name])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, Orders,
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+ +- Exchange(distribution=[hash[user_id]])
+ +- TableSourceScan(table=[[default_catalog, default_database, Payments,
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoinPartialHintCombinedWithStateTtlHint">
+ <Resource name="sql">
+ <![CDATA[SELECT /*+ MULTI_JOIN(u, o), STATE_TTL(u='1d', o='2d', p='1h')
*/u.user_id, u.name, o.order_id, p.payment_id FROM Users u INNER JOIN Orders o
ON u.user_id = o.user_id INNER JOIN Payments p ON u.user_id = p.user_id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, o]]]],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{p=1h, u=1d, o=2d}]]])
+ :- LogicalJoin(condition=[=($0, $4)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[u, o]]]],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{p=1h, u=1d, o=2d}]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, Users]],
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]],
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, Payments]],
hints=[[[ALIAS inheritPath:[] options:[p]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- Join(joinType=[InnerJoin], where=[=(user_id, user_id0)], select=[user_id,
name, order_id, payment_id, user_id0], leftInputSpec=[NoUniqueKey],
rightInputSpec=[HasUniqueKey], stateTtlHints=[[[STATE_TTL options:{RIGHT=1h}]]])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- Calc(select=[user_id, name, order_id])
+ : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER],
inputUniqueKeys=[(user_id), (order_id)], stateTtlHints=[[[STATE_TTL
options:[1d, 2d]]]], joinConditions=[=(user_id, user_id0)],
select=[user_id,name,order_id,user_id0],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
+ : :- Exchange(distribution=[hash[user_id]])
+ : : +- ChangelogNormalize(key=[user_id])
+ : : +- Exchange(distribution=[hash[user_id]])
+ : : +- TableSourceScan(table=[[default_catalog,
default_database, Users, project=[user_id, name], metadata=[]]],
fields=[user_id, name])
+ : +- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+ +- Exchange(distribution=[hash[user_id]])
+ +- TableSourceScan(table=[[default_catalog, default_database, Payments,
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
]]>
</Resource>
</TestCase>
@@ -1359,6 +1541,47 @@ Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(),
_UTF-16LE'yyMMdd') AS day], cha
+- TableSourceScan(table=[[default_catalog, default_database, src1,
project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultipleMultiJoinHintsInDifferentBranches">
+ <Resource name="sql">
+ <![CDATA[SELECT left_side.user_id, left_side.order_id,
right_side.payment_id, right_side.location FROM ( SELECT /*+ MULTI_JOIN(u, o)
*/ u.user_id, o.order_id, o.user_id ouid FROM Users u INNER JOIN Orders o
ON u.user_id = o.user_id) left_side INNER JOIN ( SELECT /*+ MULTI_JOIN(p, s)
*/ p.payment_id, p.user_id, s.location, s.user_id suid FROM Payments p
INNER JOIN Shipments s ON p.user_id = s.user_id) right_side ON
left_side.user_id = right_side.user_id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(user_id=[$0], order_id=[$1], payment_id=[$3], location=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+ :- LogicalProject(user_id=[$0], order_id=[$3], ouid=[$4])
+ : +- LogicalJoin(condition=[=($0, $4)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, o]]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database,
Users]], hints=[[[ALIAS inheritPath:[] options:[u]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
Orders]], hints=[[[ALIAS inheritPath:[] options:[o]]]])
+ +- LogicalProject(payment_id=[$0], user_id=[$2], location=[$3], suid=[$4])
+ +- LogicalJoin(condition=[=($2, $4)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[p, s]]]])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
Payments]], hints=[[[ALIAS inheritPath:[] options:[p]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
Shipments]], hints=[[[ALIAS inheritPath:[] options:[s]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[user_id, order_id, payment_id, location])
++- Join(joinType=[InnerJoin], where=[=(user_id, user_id0)], select=[user_id,
order_id, payment_id, user_id0, location], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- Calc(select=[user_id, order_id])
+ : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER],
inputUniqueKeys=[(user_id), (order_id)], joinConditions=[=(user_id, user_id0)],
select=[user_id,order_id,user_id0], rowType=[RecordType(VARCHAR(2147483647)
user_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
+ : :- Exchange(distribution=[hash[user_id]])
+ : : +- ChangelogNormalize(key=[user_id])
+ : : +- Exchange(distribution=[hash[user_id]])
+ : : +- TableSourceScan(table=[[default_catalog,
default_database, Users, project=[user_id], metadata=[]]], fields=[user_id])
+ : +- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+ +- Exchange(distribution=[hash[user_id]])
+ +- Calc(select=[payment_id, user_id, location])
+ +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER],
inputUniqueKeys=[(payment_id), noUniqueKey], joinConditions=[=(user_id,
user_id0)], select=[payment_id,user_id,location,user_id0],
rowType=[RecordType(VARCHAR(2147483647) payment_id, VARCHAR(2147483647)
user_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id0)])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Payments, project=[payment_id, user_id], metadata=[]]], fields=[payment_id,
user_id])
+ +- Exchange(distribution=[hash[user_id]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
Shipments]], fields=[location, user_id])
]]>
</Resource>
</TestCase>
@@ -1987,6 +2210,33 @@ MultiJoin(commonJoinKey=[noCommonJoinKey],
joinTypes=[INNER], inputUniqueKeys=[n
: +- TableSourceScan(table=[[default_catalog, default_database, Orders,
project=[order_id], metadata=[]]], fields=[order_id])
+- Exchange(distribution=[single])
+- TableSourceScan(table=[[default_catalog, default_database, Payments,
project=[payment_id], metadata=[]]], fields=[payment_id])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testThreeWayJoinWithMultiJoinHint">
+ <Resource name="sql">
+ <![CDATA[SELECT /*+ MULTI_JOIN(u, o, p) */u.user_id, u.name, o.order_id,
p.payment_id FROM Users u INNER JOIN Orders o ON u.user_id = o.user_id LEFT
JOIN Payments p ON u.user_id = p.user_id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[left], joinHints=[[[MULTI_JOIN
inheritPath:[0] options:[u, o, p]]]])
+ :- LogicalJoin(condition=[=($0, $4)], joinType=[inner],
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[u, o, p]]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, Users]],
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]],
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, Payments]],
hints=[[[ALIAS inheritPath:[] options:[p]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT],
inputUniqueKeys=[(user_id), (order_id), (payment_id)],
joinConditions=[=(user_id, user_id0), =(user_id, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, VARCHAR(2147483647) user_id1)])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, Users,
project=[user_id, name], metadata=[]]], fields=[user_id, name])
+ :- Exchange(distribution=[hash[user_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, Orders,
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+ +- Exchange(distribution=[hash[user_id]])
+ +- TableSourceScan(table=[[default_catalog, default_database, Payments,
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
]]>
</Resource>
</TestCase>