This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 271266a9cd5 [FLINK-38731][table] Add support for MULTI_JOIN hint
271266a9cd5 is described below

commit 271266a9cd5e4b5d3d564c956cfcda76cb60fa36
Author: Gustavo de Morais <[email protected]>
AuthorDate: Wed Nov 26 11:13:00 2025 +0000

    [FLINK-38731][table] Add support for MULTI_JOIN hint
---
 docs/content/docs/dev/table/sql/queries/hints.md   |  43 ++++
 docs/content/docs/dev/table/tuning.md              |  34 ++-
 .../table/planner/hint/FlinkHintStrategies.java    |   5 +
 .../flink/table/planner/hint/JoinStrategy.java     |  11 +-
 .../plan/rules/logical/JoinToMultiJoinRule.java    |  40 +++-
 .../plan/optimize/program/FlinkStreamProgram.scala |  30 ++-
 .../nodes/exec/stream/MultiJoinSemanticTests.java  |   3 +-
 .../nodes/exec/stream/MultiJoinTestPrograms.java   |  30 +++
 .../planner/plan/stream/sql/MultiJoinTest.java     | 127 +++++++++++
 .../plan/hints/batch/BroadcastJoinHintTest.xml     |   4 +-
 .../plan/hints/batch/NestLoopJoinHintTest.xml      |   4 +-
 .../plan/hints/batch/ShuffleHashJoinHintTest.xml   |   4 +-
 .../plan/hints/batch/ShuffleMergeJoinHintTest.xml  |   4 +-
 .../optimize/ClearQueryBlockAliasResolverTest.xml  |   4 +-
 .../plan/optimize/QueryHintsResolverTest.xml       |   4 +-
 .../planner/plan/stream/sql/MultiJoinTest.xml      | 250 +++++++++++++++++++++
 16 files changed, 558 insertions(+), 39 deletions(-)

diff --git a/docs/content/docs/dev/table/sql/queries/hints.md 
b/docs/content/docs/dev/table/sql/queries/hints.md
index e60d6dbc6ea..3cb34267bcc 100644
--- a/docs/content/docs/dev/table/sql/queries/hints.md
+++ b/docs/content/docs/dev/table/sql/queries/hints.md
@@ -279,6 +279,49 @@ SELECT /*+ NEST_LOOP(t1) */ * FROM t1 JOIN t2 ON t1.id = 
t2.id;
 SELECT /*+ NEST_LOOP(t1, t3) */ * FROM t1 JOIN t2 ON t1.id = t2.id JOIN t3 ON 
t1.id = t3.id;
 ```
 
+#### MULTI_JOIN
+
+{{< label Streaming >}}
+
+`MULTI_JOIN` suggests that Flink uses the `MultiJoin operator` to process 
multiple regular joins simultaneously. This type of join hint is recommended 
when you have multiple joins that share at least one common join key and 
experience large intermediate state or record amplification. The MultiJoin 
operator eliminates intermediate state by processing joins across various input 
streams simultaneously, which can significantly reduce state size and improve 
performance in some cases.
+
+For more details on the MultiJoin operator, including when to use it and 
configuration options, see [Multiple Regular Joins]({{< ref 
"docs/dev/table/tuning" >}}#multiple-regular-joins).
+
+{{< hint info >}}
+Note:
+- The MULTI_JOIN hint can specify table names or table aliases. If a table has 
an alias, the hint must use the alias name.
+- At least one key must be shared between the join conditions for the 
MultiJoin operator to be applied.
+- When specified, the MULTI_JOIN hint applies to the tables listed in the hint 
within the current query block.
+{{< /hint >}}
+
+##### Examples
+
+```sql
+CREATE TABLE t1 (id BIGINT, name STRING, age INT) WITH (...);
+CREATE TABLE t2 (id BIGINT, name STRING, age INT) WITH (...);
+CREATE TABLE t3 (id BIGINT, name STRING, age INT) WITH (...);
+
+-- Flink will use the MultiJoin operator for the three-way join.
+SELECT /*+ MULTI_JOIN(t1, t2, t3) */ * FROM t1 
+JOIN t2 ON t1.id = t2.id 
+JOIN t3 ON t1.id = t3.id;
+
+-- Using table names instead of aliases.
+SELECT /*+ MULTI_JOIN(Users, Orders, Payments) */ * FROM Users 
+INNER JOIN Orders ON Users.user_id = Orders.user_id 
+INNER JOIN Payments ON Users.user_id = Payments.user_id;
+
+-- Partial match: only t1 and t2 will use MultiJoin, t3 will use regular join.
+SELECT /*+ MULTI_JOIN(t1, t2) */ * FROM t1 
+JOIN t2 ON t1.id = t2.id 
+JOIN t3 ON t1.id = t3.id;
+
+-- Combining MULTI_JOIN with STATE_TTL hint.
+SELECT /*+ MULTI_JOIN(t1, t2, t3), STATE_TTL('t1'='1d', 't2'='2d', 't3'='12h') 
*/ * FROM t1 
+JOIN t2 ON t1.id = t2.id 
+JOIN t3 ON t1.id = t3.id;
+```
+
 #### LOOKUP
 
 {{< label Streaming >}}
diff --git a/docs/content/docs/dev/table/tuning.md 
b/docs/content/docs/dev/table/tuning.md
index b600dd7f908..af099b6395d 100644
--- a/docs/content/docs/dev/table/tuning.md
+++ b/docs/content/docs/dev/table/tuning.md
@@ -307,9 +307,9 @@ MiniBatch optimization is disabled by default for regular 
join. In order to enab
 
 {{< label Streaming >}}
 
-Streaming Flink jobs with multiple non-temporal regular joins often experience 
operational instability and performance degradation due to large state sizes. 
This is often because the intermediate state created by a chain of joins is 
much larger than the input state itself. In Flink 2.1, we introduce a new 
multi-join operator, an optimization designed to significantly reduce state 
size and improve performance for join pipelines that involve record 
amplification and large intermediate stat [...]
+Streaming Flink jobs with multiple non-temporal regular joins often experience 
operational instability and performance degradation due to large state sizes. 
This is often because the intermediate state created by a chain of joins is 
much larger than the input state itself. In Flink 2.1, we introduce a new 
multi-join operator, an optimization designed to significantly reduce state 
size and improve performance for join pipelines that involve record 
amplification and large intermediate stat [...]
 
-In most joins, a significant portion of processing time is spent fetching 
records from the state. The efficiency of the MultiJoin operator largely 
depends on the size of this intermediate state. In a common scenario where a 
pipeline experiences record amplification—meaning each join produces more data 
and records than the previous one, the MultiJoin operator is more efficient. 
This is because it keeps the state on which the operator interacts much 
smaller, leading to a more stable operat [...]
+In most joins, a significant portion of processing time is spent fetching 
records from the state. The efficiency of the MultiJoin operator largely 
depends on the size of this intermediate state and the selectivity of the 
common join key(s). In a common scenario where a pipeline experiences record 
amplification—meaning each join produces more data and records than the 
previous one, the MultiJoin operator is more efficient. This is because it 
keeps the state on which the operator interacts [...]
 
 ### The MultiJoin Operator
 The main benefits of the MultiJoin operator are:
@@ -318,21 +318,39 @@ The main benefits of the MultiJoin operator are:
 2) Improved performance for chained joins with record amplification.
 3) Improved stability: linear state growth with amount of records processed, 
instead of polynomial growth with binary joins. 
 
-Also, pipelines with MultiJoin instead of binary joins usually have faster 
initialization and recovery times due to smaller state and fewer amount of 
nodes.
+Also, pipelines with MultiJoin instead of binary joins usually have faster 
initialization and recovery times due to smaller state and fewer nodes.
 
 ### When to enable the MultiJoin?
 
-If your job has multiple joins that share at least one common join key, and 
you observe that the intermediate state in the intermediate joins is larger 
than the inputs sources, consider enabling the MultiJoin operator.
+If your job has multiple joins that share at least one common join key, and 
you observe that the intermediate state in the intermediate joins is larger 
than the input sources, consider enabling the MultiJoin operator.
+
+Recommended use cases:
+- The common join key(s) have a high selectivity (the number of records per 
key is small)
+- Statement with several chained joins and considerable intermediate state
+- No considerable data skew on the common join key(s)
+- Joins are generating large state (state 50+ GB)
+
+If your common join key(s) exhibit low selectivity (i.e., a high number of 
rows sharing the same key value), the MultiJoin operator's required 
recomputation of the intermediate state can severely impact performance. In 
such scenarios, binary joins are recommended, as these will partition the data 
using all join keys.
 
 ### How to enable the MultiJoin?
 
-To enable this optimization, set the following configuration
+To enable this optimization globally for all eligible joins, set the following 
configuration:
 
 ```sql
 SET 'table.optimizer.multi-join.enabled' = 'true';
 ```
 
-Important: This is currently in an experimental state - there are open 
optimizations and breaking changes might be implemented in this version. We 
currently support only streaming INNER/LEFT joins. Support for RIGHT joins will 
be added soon. Due to records partitioning, you need at least one key that is 
shared between the join conditions, see:
+Alternatively, you can enable the MultiJoin operator for specific tables using 
the `MULTI_JOIN` hint:
+
+```sql
+SELECT /*+ MULTI_JOIN(t1, t2, t3) */ * FROM t1 
+JOIN t2 ON t1.id = t2.id 
+JOIN t3 ON t1.id = t3.id;
+```
+
+The hint approach allows you to selectively apply the MultiJoin optimization 
to specific query blocks without enabling it globally. You can specify either 
table names or table aliases in the hint. For more details on the MULTI_JOIN 
hint, see [Join Hints]({{< ref "docs/dev/table/sql/queries/hints" 
>}}#multi_join).
+
+Important: This is currently in an experimental state - optimizations and 
breaking changes might be implemented. We currently support only streaming 
INNER/LEFT joins. Due to records partitioning, you need at least one key that 
is shared between the join conditions, see:
 
 - Supported: A JOIN B ON A.key = B.key JOIN C ON A.key = C.key (Partition by 
key)
 - Supported: A JOIN B ON A.key = B.key JOIN C ON B.key = C.key (Partition by 
key via transitivity)
@@ -349,9 +367,9 @@ For this 10-way join above, involving record amplification, 
we've observed signi
 - Performance: 2x to over 100x+ increase in processed records when both at 
100% busyness.
 - State Size: 3x to over 1000x+ smaller as intermediate state grows.
 
-The total state is always smaller with the MultiJoin operator. In this case, 
the performance is initially the same, but as the intermediate state grows, the 
performance of binary joins degrade and the multi join remains stable and 
outperforms.
+The total state is always smaller with the MultiJoin operator. In this case, 
the performance is initially the same, but as the intermediate state grows, the 
performance of binary joins degrades and the multi join remains stable and 
outperforms.
 
-This general benchmark for the 10-way join was run with the following 
configuration: 10 upsert kafka topics, 10 parallelism, 1 record per second per 
topic. We used rocksdb with unaligned checkpoints and with incremental 
checkpoints. Each job ran in one TaskManager containing 8GB process memory, 1GB 
off-heap memory and 20% network memory. The JobManager had 4GB process memory. 
The host machine contained a M1 processor chip, 32GB RAM and 1TB SSD. The sink 
uses a blackhole connector so we o [...]
+This general benchmark for the 10-way join was run with the following 
configuration: 1 record per tenant_id (high selectivity), 10 upsert kafka 
topics, 10 parallelism, 1 record per second per topic. We used rocksdb with 
unaligned checkpoints and with incremental checkpoints. Each job ran in one 
TaskManager containing 8GB process memory, 1GB off-heap memory and 20% network 
memory. The JobManager had 4GB process memory. The host machine contained a M1 
processor chip, 32GB RAM and 1TB SSD.  [...]
 
 ```sql
 INSERT INTO JoinResultsMJ
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
index c83e0affef4..33e6d4346dd 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
@@ -121,6 +121,11 @@ public abstract class FlinkHintStrategies {
                                                 HintPredicates.CORRELATE, 
HintPredicates.JOIN))
                                 
.optionChecker(LOOKUP_NON_EMPTY_KV_OPTION_CHECKER)
                                 .build())
+                .hintStrategy(
+                        JoinStrategy.MULTI_JOIN.getJoinHintName(),
+                        HintStrategy.builder(HintPredicates.JOIN)
+                                .optionChecker(NON_EMPTY_LIST_OPTION_CHECKER)
+                                .build())
                 .hintStrategy(
                         StateTtlHint.STATE_TTL.getHintName(),
                         HintStrategy.builder(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
index c7aff224590..0ded47cb222 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
@@ -48,7 +48,14 @@ public enum JoinStrategy {
     NEST_LOOP("NEST_LOOP"),
 
     /** Instructs the optimizer to use lookup join strategy. Only accept 
key-value hint options. */
-    LOOKUP("LOOKUP");
+    LOOKUP("LOOKUP"),
+
+    /**
+     * Instructs the optimizer to use multi-way join strategy for streaming 
queries. This hint
+     * allows specifying multiple tables to be joined together in a single 
{@link
+     * 
org.apache.flink.table.runtime.operators.join.stream.StreamingMultiJoinOperator}.
+     */
+    MULTI_JOIN("MULTI_JOIN");
 
     private final String joinHintName;
 
@@ -83,6 +90,8 @@ public enum JoinStrategy {
                 return options.size() > 0;
             case LOOKUP:
                 return null == options || options.size() == 0;
+            case MULTI_JOIN:
+                return options.size() > 0;
         }
         return false;
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java
index 81f27d29788..1712176f1f7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.table.planner.plan.rules.logical;
 
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.hint.JoinStrategy;
 import org.apache.flink.table.planner.hint.StateTtlHint;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMultiJoin;
 import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
 import 
org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor;
 import 
org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor;
 import org.apache.flink.table.types.logical.RowType;
@@ -170,7 +174,41 @@ public class JoinToMultiJoinRule extends 
RelRule<JoinToMultiJoinRule.Config>
             return false;
         }
 
-        return origJoin.getJoinType().projectsRight();
+        if (!origJoin.getJoinType().projectsRight()) {
+            return false;
+        }
+
+        // Enable multi-join if either config is enabled OR MULTI_JOIN hint is 
present
+        return isEnabledViaConfig(origJoin) || hasMultiJoinHint(origJoin);
+    }
+
+    /**
+     * Checks if multi-join optimization is enabled via configuration.
+     *
+     * @param join the join node
+     * @return true if TABLE_OPTIMIZER_MULTI_JOIN_ENABLED is set to true
+     */
+    private boolean isEnabledViaConfig(Join join) {
+        final TableConfig tableConfig = ShortcutUtils.unwrapTableConfig(join);
+        return 
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED);
+    }
+
+    /**
+     * Checks if the MULTI_JOIN hint is present on the join node.
+     *
+     * <p>Note: By the time this rule sees the join, the QueryHintsResolver 
has already validated
+     * the hint. If the hint is present with valid options, it means both 
sides of this join were
+     * mentioned in the original hint and have been validated.
+     *
+     * @param join the join node
+     * @return true if MULTI_JOIN hint is present and valid
+     */
+    private boolean hasMultiJoinHint(Join join) {
+        return join.getHints().stream()
+                .anyMatch(
+                        hint ->
+                                
JoinStrategy.MULTI_JOIN.getJoinHintName().equals(hint.hintName)
+                                        && !hint.listOptions.isEmpty());
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
index e0e2481d5a2..8ec32c242fa 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
@@ -233,22 +233,20 @@ object FlinkStreamProgram {
     }
 
     // multi-join
-    if 
(tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED)) {
-      chainedProgram.addLast(
-        MULTI_JOIN,
-        FlinkGroupProgramBuilder
-          .newBuilder[StreamOptimizeContext]
-          .addProgram(
-            FlinkHepRuleSetProgramBuilder.newBuilder
-              
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
-              .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
-              .add(FlinkStreamRuleSets.MULTI_JOIN_RULES)
-              .build(),
-            "merge binary regular joins into MultiJoin"
-          )
-          .build()
-      )
-    }
+    chainedProgram.addLast(
+      MULTI_JOIN,
+      FlinkGroupProgramBuilder
+        .newBuilder[StreamOptimizeContext]
+        .addProgram(
+          FlinkHepRuleSetProgramBuilder.newBuilder
+            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+            .add(FlinkStreamRuleSets.MULTI_JOIN_RULES)
+            .build(),
+          "merge binary regular joins into MultiJoin"
+        )
+        .build()
+    )
 
     // project rewrite
     chainedProgram.addLast(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
index 20d36a0dfda..706e512a3de 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
@@ -54,6 +54,7 @@ public class MultiJoinSemanticTests extends SemanticTestBase {
                 
MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_IN_CONDITIONS_MATERIALIZATION,
                 
MultiJoinTestPrograms.MULTI_JOIN_TWO_WAY_INNER_JOIN_WITH_WHERE_IN,
                 
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_MULTI_KEY_TYPES,
-                
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_MIXED_JOIN_MULTI_KEY_TYPES_SHUFFLED);
+                
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_MIXED_JOIN_MULTI_KEY_TYPES_SHUFFLED,
+                
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_HINT);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
index c19a5a28092..9e3e7fa7631 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
@@ -1823,4 +1823,34 @@ public class MultiJoinTestPrograms {
                                     + "LEFT JOIN Shipments4K AS S ON U.k3 = 
S.k3 AND U.k2 > 150 AND U.k4 = S.k4 "
                                     + "WHERE U.k2 > 50")
                     .build();
+
+    public static final TableTestProgram 
MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_HINT =
+            TableTestProgram.of(
+                            "three-way-inner-join-with-hint",
+                            "three way inner join using MULTI_JOIN hint")
+                    
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false)
+                    .setupTableSource(USERS_SOURCE)
+                    .setupTableSource(ORDERS_SOURCE)
+                    .setupTableSource(PAYMENTS_SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(
+                                            "user_id STRING",
+                                            "name STRING",
+                                            "order_id STRING",
+                                            "payment_id STRING")
+                                    .consumedValues(
+                                            "+I[1, Gus, order1, payment1]",
+                                            "+I[2, Bob, order2, payment2]",
+                                            "+I[2, Bob, order3, payment2]",
+                                            "+I[1, Gus, order1, payment3]")
+                                    .testMaterializedData()
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink "
+                                    + "SELECT /*+ MULTI_JOIN(u, o, p) */ 
u.user_id, u.name, o.order_id, p.payment_id "
+                                    + "FROM Users u "
+                                    + "INNER JOIN Orders o ON u.user_id = 
o.user_id "
+                                    + "INNER JOIN Payments p ON u.user_id = 
p.user_id")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
index 81cc1a10ef9..342a6913df3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
@@ -271,6 +271,133 @@ public class MultiJoinTest extends TableTestBase {
                         + "    ON u.user_id = p.user_id");
     }
 
+    @Test
+    void testThreeWayJoinWithMultiJoinHint() {
+        // Disable config so the MultiJoin is enabled by hints
+        util.getTableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+        util.verifyRelPlan(
+                "SELECT /*+ MULTI_JOIN(u, o, p) */u.user_id, u.name, 
o.order_id, p.payment_id "
+                        + "FROM Users u "
+                        + "INNER JOIN Orders o ON u.user_id = o.user_id "
+                        + "LEFT JOIN Payments p ON u.user_id = p.user_id");
+    }
+
+    @Test
+    void testMultiJoinHintCombinedWithStateTtlHint() {
+        util.getTableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+        util.verifyRelPlan(
+                "SELECT /*+ MULTI_JOIN(u, o, p), STATE_TTL(u='1d', o='2d', 
p='1h') */u.user_id, u.name, o.order_id, p.payment_id "
+                        + "FROM Users u "
+                        + "INNER JOIN Orders o ON u.user_id = o.user_id "
+                        + "INNER JOIN Payments p ON u.user_id = p.user_id");
+    }
+
+    @Test
+    void testMultiJoinPartialHintCombinedWithStateTtlHint() {
+        util.getTableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+        util.verifyRelPlan(
+                "SELECT /*+ MULTI_JOIN(u, o), STATE_TTL(u='1d', o='2d', 
p='1h') */u.user_id, u.name, o.order_id, p.payment_id "
+                        + "FROM Users u "
+                        + "INNER JOIN Orders o ON u.user_id = o.user_id "
+                        + "INNER JOIN Payments p ON u.user_id = p.user_id");
+    }
+
+    @Test
+    void testMultiJoinHintWithTableNames() {
+        util.getTableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+        util.verifyRelPlan(
+                "SELECT /*+ MULTI_JOIN(Users, Orders, Payments) 
*/Users.user_id, Users.name, Orders.order_id, Payments.payment_id "
+                        + "FROM Users "
+                        + "INNER JOIN Orders ON Users.user_id = Orders.user_id 
"
+                        + "INNER JOIN Payments ON Users.user_id = 
Payments.user_id");
+    }
+
+    @Test
+    void testMultiJoinHintPartialMatch() {
+        // First join (u, o) should become MultiJoin with hint
+        // When the result joins with s, that's not in the hint so regular 
join should be used
+        util.getTableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+        util.verifyRelPlan(
+                "SELECT /*+ MULTI_JOIN(u, o) */u.user_id, u.name, o.order_id, 
s.location "
+                        + "FROM Users u "
+                        + "INNER JOIN Orders o ON u.user_id = o.user_id "
+                        + "INNER JOIN Shipments s ON u.user_id = s.user_id");
+    }
+
+    @Test
+    void testMultiJoinHintWithMixedNamesAndAliases() {
+        // Hint uses table name for Users, but aliases for others - matching 
should work
+        util.getTableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+        util.verifyRelPlan(
+                "SELECT /*+ MULTI_JOIN(Users, o, p) */ Users.user_id, 
Users.name, o.order_id, p.payment_id "
+                        + "FROM Users "
+                        + "INNER JOIN Orders o ON Users.user_id = o.user_id "
+                        + "INNER JOIN Payments p ON Users.user_id = 
p.user_id");
+    }
+
+    @Test
+    void testChainedMultiJoinHints() {
+        // Tests two separate MULTI_JOIN hints in the same query
+        // Inner subquery has MULTI_JOIN(o, p, s) and outer query joins with 
Users
+        // This verifies that multiple MULTI_JOIN hints can be used at 
different query levels
+        util.getTableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+        util.verifyRelPlan(
+                "SELECT /*+ MULTI_JOIN(u, subq) */ u.user_id, u.name, 
subq.order_id, subq.payment_id, subq.location "
+                        + "FROM Users u "
+                        + "INNER JOIN ("
+                        + "  SELECT /*+ MULTI_JOIN(o, p, s) */ o.order_id, 
o.user_id u1, p.payment_id, p.user_id u2, s.location, s.user_id u3"
+                        + "  FROM Orders o "
+                        + "  INNER JOIN Payments p ON o.user_id = p.user_id "
+                        + "  INNER JOIN Shipments s ON p.user_id = s.user_id"
+                        + ") subq ON u.user_id = subq.u1");
+    }
+
+    @Test
+    void testMultipleMultiJoinHintsInDifferentBranches() {
+        // Tests multiple MULTI_JOIN hints where two separate multi-joins are 
joined together
+        // Left side: MULTI_JOIN(u, o)
+        // Right side: MULTI_JOIN(p, s)
+        // Then these two multi-joins are joined together
+        util.getTableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false);
+
+        util.verifyRelPlan(
+                "SELECT left_side.user_id, left_side.order_id, 
right_side.payment_id, right_side.location "
+                        + "FROM ("
+                        + "  SELECT /*+ MULTI_JOIN(u, o) */ u.user_id, 
o.order_id, o.user_id ouid "
+                        + "  FROM Users u "
+                        + "  INNER JOIN Orders o ON u.user_id = o.user_id"
+                        + ") left_side "
+                        + "INNER JOIN ("
+                        + "  SELECT /*+ MULTI_JOIN(p, s) */ p.payment_id, 
p.user_id, s.location, s.user_id suid "
+                        + "  FROM Payments p "
+                        + "  INNER JOIN Shipments s ON p.user_id = s.user_id"
+                        + ") right_side "
+                        + "ON left_side.user_id = right_side.user_id");
+    }
+
     @Test
     void testThreeWayLeftOuterJoinExecPlan() {
         util.verifyExecPlan(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
index eadacfb9c55..4b24a4f7a39 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
@@ -1667,12 +1667,12 @@ HashAggregate(isMerge=[true], groupBy=[a1, b1, a2, b2], 
select=[a1, b1, a2, b2])
   </TestCase>
   <TestCase name="testMultiJoinHints">
     <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1), 
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 = 
T2.a2]]>
+      <![CDATA[select /*+ BROADCAST(T1), 
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1 
join T2 on T1.a1 = T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST 
inheritPath:[0] options:[T1]][SHUFFLE_HASH inheritPath:[0] 
options:[T1]][SHUFFLE_MERGE inheritPath:[0] options:[T1]][NEST_LOOP 
inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST 
inheritPath:[0] options:[T1]][SHUFFLE_HASH inheritPath:[0] 
options:[T1]][SHUFFLE_MERGE inheritPath:[0] options:[T1]][NEST_LOOP 
inheritPath:[0] options:[T1]][MULTI_JOIN inheritPath:[0] options:[T1]]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
index 50a1a8bc7fe..d280cd87b0d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
@@ -1663,12 +1663,12 @@ HashAggregate(isMerge=[true], groupBy=[a1, b1, a2, b2], 
select=[a1, b1, a2, b2])
   </TestCase>
   <TestCase name="testMultiJoinHints">
     <Resource name="sql">
-      <![CDATA[select /*+ NEST_LOOP(T1), 
BROADCAST(T1),SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1) */* from T1 join T2 on T1.a1 = 
T2.a2]]>
+      <![CDATA[select /*+ NEST_LOOP(T1), 
BROADCAST(T1),SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),MULTI_JOIN(T1) */* from T1 
join T2 on T1.a1 = T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[NEST_LOOP 
inheritPath:[0] options:[T1]][BROADCAST inheritPath:[0] 
options:[T1]][SHUFFLE_HASH inheritPath:[0] options:[T1]][SHUFFLE_MERGE 
inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[NEST_LOOP 
inheritPath:[0] options:[T1]][BROADCAST inheritPath:[0] 
options:[T1]][SHUFFLE_HASH inheritPath:[0] options:[T1]][SHUFFLE_MERGE 
inheritPath:[0] options:[T1]][MULTI_JOIN inheritPath:[0] options:[T1]]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
index 63a9af10ea7..c6f3c923360 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
@@ -1702,12 +1702,12 @@ HashAggregate(isMerge=[true], groupBy=[a1, b1, a2, b2], 
select=[a1, b1, a2, b2])
   </TestCase>
   <TestCase name="testMultiJoinHints">
     <Resource name="sql">
-      <![CDATA[select /*+ SHUFFLE_HASH(T1), 
BROADCAST(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 = 
T2.a2]]>
+      <![CDATA[select /*+ SHUFFLE_HASH(T1), 
BROADCAST(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1 join 
T2 on T1.a1 = T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], 
joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]][BROADCAST 
inheritPath:[0] options:[T1]][SHUFFLE_MERGE inheritPath:[0] 
options:[T1]][NEST_LOOP inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], 
joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]][BROADCAST 
inheritPath:[0] options:[T1]][SHUFFLE_MERGE inheritPath:[0] 
options:[T1]][NEST_LOOP inheritPath:[0] options:[T1]][MULTI_JOIN 
inheritPath:[0] options:[T1]]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
index d8d9f36f4e6..0c12624194c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
@@ -1702,12 +1702,12 @@ HashAggregate(isMerge=[true], groupBy=[a1, b1, a2, b2], 
select=[a1, b1, a2, b2])
   </TestCase>
   <TestCase name="testMultiJoinHints">
     <Resource name="sql">
-      <![CDATA[select /*+ SHUFFLE_MERGE(T1), 
BROADCAST(T1),SHUFFLE_HASH(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 = 
T2.a2]]>
+      <![CDATA[select /*+ SHUFFLE_MERGE(T1), 
BROADCAST(T1),SHUFFLE_HASH(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1 join T2 
on T1.a1 = T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], 
joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]][BROADCAST 
inheritPath:[0] options:[T1]][SHUFFLE_HASH inheritPath:[0] 
options:[T1]][NEST_LOOP inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], 
joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]][BROADCAST 
inheritPath:[0] options:[T1]][SHUFFLE_HASH inheritPath:[0] 
options:[T1]][NEST_LOOP inheritPath:[0] options:[T1]][MULTI_JOIN 
inheritPath:[0] options:[T1]]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
index 693f5948a69..75a756dc770 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
@@ -919,12 +919,12 @@ LogicalUnion(all=[false]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1
   </TestCase>
   <TestCase name="testMultiJoinHints">
     <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1), 
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 = 
T2.a2]]>
+      <![CDATA[select /*+ BROADCAST(T1), 
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1 
join T2 on T1.a1 = T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT 
a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST 
options:[LEFT]][SHUFFLE_HASH options:[LEFT]][SHUFFLE_MERGE 
options:[LEFT]][NEST_LOOP options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST 
options:[LEFT]][SHUFFLE_HASH options:[LEFT]][SHUFFLE_MERGE 
options:[LEFT]][NEST_LOOP options:[LEFT]][MULTI_JOIN options:[LEFT]]]]), 
rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, 
VARCHAR(2147483647) b2)]
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), 
rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), 
rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/QueryHintsResolverTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/QueryHintsResolverTest.xml
index a3a39373d8e..24db2525969 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/QueryHintsResolverTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/QueryHintsResolverTest.xml
@@ -919,12 +919,12 @@ LogicalUnion(all=[false]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1
   </TestCase>
   <TestCase name="testMultiJoinHints">
     <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1), 
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1) */* from T1 join T2 on T1.a1 = 
T2.a2]]>
+      <![CDATA[select /*+ BROADCAST(T1), 
SHUFFLE_HASH(T1),SHUFFLE_MERGE(T1),NEST_LOOP(T1),MULTI_JOIN(T1) */* from T1 
join T2 on T1.a1 = T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT 
a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST 
options:[LEFT]][SHUFFLE_HASH options:[LEFT]][SHUFFLE_MERGE 
options:[LEFT]][NEST_LOOP options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST 
options:[LEFT]][SHUFFLE_HASH options:[LEFT]][SHUFFLE_MERGE 
options:[LEFT]][NEST_LOOP options:[LEFT]][MULTI_JOIN options:[LEFT]]]]), 
rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, 
VARCHAR(2147483647) b2)]
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]]), rowType=[RecordType(BIGINT a2, 
VARCHAR(2147483647) b2)]
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index 6c3a455d789..aae1eb85015 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -215,6 +215,43 @@ Calc(select=[user_id, name, order_id, dept_name, 
project_name, budget])
          +- ChangelogNormalize(key=[project_id], condition=[=(status, 
'ACTIVE')])
             +- Exchange(distribution=[hash[project_id]])
                +- TableSourceScan(table=[[default_catalog, default_database, 
Projects, filter=[]]], fields=[project_id, project_name, dept_id, status])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChainedMultiJoinHints">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ MULTI_JOIN(u, subq) */ u.user_id, u.name, 
subq.order_id, subq.payment_id, subq.location FROM Users u INNER JOIN (  SELECT 
/*+ MULTI_JOIN(o, p, s) */ o.order_id, o.user_id u1, p.payment_id, p.user_id 
u2, s.location, s.user_id u3  FROM Orders o   INNER JOIN Payments p ON 
o.user_id = p.user_id   INNER JOIN Shipments s ON p.user_id = s.user_id) subq 
ON u.user_id = subq.u1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$5], 
location=[$7])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, subq]]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, Users]], 
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+   +- LogicalProject(order_id=[$0], u1=[$1], payment_id=[$3], u2=[$5], 
location=[$6], u3=[$7])
+      +- LogicalJoin(condition=[=($5, $7)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[o, p, s]]]])
+         :- LogicalJoin(condition=[=($1, $5)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[o, p, s]]]])
+         :  :- LogicalTableScan(table=[[default_catalog, default_database, 
Orders]], hints=[[[ALIAS inheritPath:[] options:[o]]]])
+         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
Payments]], hints=[[[ALIAS inheritPath:[] options:[p]]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
Shipments]], hints=[[[ALIAS inheritPath:[] options:[s]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id, location])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER], 
inputUniqueKeys=[(user_id), noUniqueKey], joinConditions=[=(user_id, u1)], 
select=[user_id,name,order_id,u1,payment_id,location], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) u1, VARCHAR(2147483647) 
payment_id, VARCHAR(2147483647) location)])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- ChangelogNormalize(key=[user_id])
+   :     +- Exchange(distribution=[hash[user_id]])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
Users, project=[user_id, name], metadata=[]]], fields=[user_id, name])
+   +- Exchange(distribution=[hash[user_id]])
+      +- Calc(select=[order_id, user_id, payment_id, location])
+         +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(order_id), (payment_id), noUniqueKey], 
joinConditions=[=(user_id, user_id0), =(user_id0, user_id1)], 
select=[order_id,user_id,payment_id,user_id0,location,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id0, 
VARCHAR(2147483647) location, VARCHAR(2147483647) user_id1)])
+            :- Exchange(distribution=[hash[user_id]])
+            :  +- TableSourceScan(table=[[default_catalog, default_database, 
Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+            :- Exchange(distribution=[hash[user_id]])
+            :  +- TableSourceScan(table=[[default_catalog, default_database, 
Payments, project=[payment_id, user_id], metadata=[]]], fields=[payment_id, 
user_id])
+            +- Exchange(distribution=[hash[user_id]])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
Shipments]], fields=[location, user_id])
 ]]>
     </Resource>
   </TestCase>
@@ -1324,6 +1361,151 @@ MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER], 
inputUniqueKeys=[(user_id)
          +- Exchange(distribution=[hash[user_id]])
             +- Calc(select=[payment_id, user_id, price], where=[>(price, 100)])
                +- TableSourceScan(table=[[default_catalog, default_database, 
Payments, filter=[]]], fields=[payment_id, price, user_id])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoinHintCombinedWithStateTtlHint">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ MULTI_JOIN(u, o, p), STATE_TTL(u='1d', o='2d', 
p='1h') */u.user_id, u.name, o.order_id, p.payment_id FROM Users u INNER JOIN 
Orders o ON u.user_id = o.user_id INNER JOIN Payments p ON u.user_id = 
p.user_id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, o, p]]]], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{p=1h, u=1d, o=2d}]]])
+   :- LogicalJoin(condition=[=($0, $4)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[u, o, p]]]], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{p=1h, u=1d, o=2d}]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, Users]], 
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]], 
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, Payments]], 
hints=[[[ALIAS inheritPath:[] options:[p]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id), (payment_id)], 
stateTtlHints=[[[STATE_TTL options:[1d, 2d, 1h]]]], joinConditions=[=(user_id, 
user_id0), =(user_id, user_id1)], 
select=[user_id,name,order_id,user_id0,payment_id,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
payment_id, VARCHAR(2147483647) user_id1)])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id, name], metadata=[]]], fields=[user_id, name])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, Orders, 
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+   +- Exchange(distribution=[hash[user_id]])
+      +- TableSourceScan(table=[[default_catalog, default_database, Payments, 
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoinHintPartialMatch">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ MULTI_JOIN(u, o) */u.user_id, u.name, o.order_id, 
s.location FROM Users u INNER JOIN Orders o ON u.user_id = o.user_id INNER JOIN 
Shipments s ON u.user_id = s.user_id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], location=[$6])
++- LogicalJoin(condition=[=($0, $7)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, o]]]])
+   :- LogicalJoin(condition=[=($0, $4)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[u, o]]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, Users]], 
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]], 
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, Shipments]], 
hints=[[[ALIAS inheritPath:[] options:[s]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[user_id, name, order_id, location])
++- Join(joinType=[InnerJoin], where=[=(user_id, user_id0)], select=[user_id, 
name, order_id, location, user_id0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- Calc(select=[user_id, name, order_id])
+   :     +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER], 
inputUniqueKeys=[(user_id), (order_id)], joinConditions=[=(user_id, user_id0)], 
select=[user_id,name,order_id,user_id0], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
+   :        :- Exchange(distribution=[hash[user_id]])
+   :        :  +- ChangelogNormalize(key=[user_id])
+   :        :     +- Exchange(distribution=[hash[user_id]])
+   :        :        +- TableSourceScan(table=[[default_catalog, 
default_database, Users, project=[user_id, name], metadata=[]]], 
fields=[user_id, name])
+   :        +- Exchange(distribution=[hash[user_id]])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, 
Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+   +- Exchange(distribution=[hash[user_id]])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
Shipments]], fields=[location, user_id])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoinHintWithMixedNamesAndAliases">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ MULTI_JOIN(Users, o, p) */ Users.user_id, 
Users.name, o.order_id, p.payment_id FROM Users INNER JOIN Orders o ON 
Users.user_id = o.user_id INNER JOIN Payments p ON Users.user_id = p.user_id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[Users, o, p]]]])
+   :- LogicalJoin(condition=[=($0, $4)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[Users, o, p]]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, Users]], 
hints=[[[ALIAS inheritPath:[] options:[Users]]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]], 
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, Payments]], 
hints=[[[ALIAS inheritPath:[] options:[p]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id), (payment_id)], 
joinConditions=[=(user_id, user_id0), =(user_id, user_id1)], 
select=[user_id,name,order_id,user_id0,payment_id,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
payment_id, VARCHAR(2147483647) user_id1)])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id, name], metadata=[]]], fields=[user_id, name])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, Orders, 
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+   +- Exchange(distribution=[hash[user_id]])
+      +- TableSourceScan(table=[[default_catalog, default_database, Payments, 
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoinHintWithTableNames">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ MULTI_JOIN(Users, Orders, Payments) */Users.user_id, 
Users.name, Orders.order_id, Payments.payment_id FROM Users INNER JOIN Orders 
ON Users.user_id = Orders.user_id INNER JOIN Payments ON Users.user_id = 
Payments.user_id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[Users, Orders, Payments]]]])
+   :- LogicalJoin(condition=[=($0, $4)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[Users, Orders, Payments]]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, Users]], 
hints=[[[ALIAS inheritPath:[] options:[Users]]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]], 
hints=[[[ALIAS inheritPath:[] options:[Orders]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, Payments]], 
hints=[[[ALIAS inheritPath:[] options:[Payments]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id), (payment_id)], 
joinConditions=[=(user_id, user_id0), =(user_id, user_id1)], 
select=[user_id,name,order_id,user_id0,payment_id,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
payment_id, VARCHAR(2147483647) user_id1)])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id, name], metadata=[]]], fields=[user_id, name])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, Orders, 
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+   +- Exchange(distribution=[hash[user_id]])
+      +- TableSourceScan(table=[[default_catalog, default_database, Payments, 
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoinPartialHintCombinedWithStateTtlHint">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ MULTI_JOIN(u, o), STATE_TTL(u='1d', o='2d', p='1h') 
*/u.user_id, u.name, o.order_id, p.payment_id FROM Users u INNER JOIN Orders o 
ON u.user_id = o.user_id INNER JOIN Payments p ON u.user_id = p.user_id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, o]]]], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{p=1h, u=1d, o=2d}]]])
+   :- LogicalJoin(condition=[=($0, $4)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[u, o]]]], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{p=1h, u=1d, o=2d}]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, Users]], 
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]], 
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, Payments]], 
hints=[[[ALIAS inheritPath:[] options:[p]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- Join(joinType=[InnerJoin], where=[=(user_id, user_id0)], select=[user_id, 
name, order_id, payment_id, user_id0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[HasUniqueKey], stateTtlHints=[[[STATE_TTL options:{RIGHT=1h}]]])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- Calc(select=[user_id, name, order_id])
+   :     +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER], 
inputUniqueKeys=[(user_id), (order_id)], stateTtlHints=[[[STATE_TTL 
options:[1d, 2d]]]], joinConditions=[=(user_id, user_id0)], 
select=[user_id,name,order_id,user_id0], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
+   :        :- Exchange(distribution=[hash[user_id]])
+   :        :  +- ChangelogNormalize(key=[user_id])
+   :        :     +- Exchange(distribution=[hash[user_id]])
+   :        :        +- TableSourceScan(table=[[default_catalog, 
default_database, Users, project=[user_id, name], metadata=[]]], 
fields=[user_id, name])
+   :        +- Exchange(distribution=[hash[user_id]])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, 
Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+   +- Exchange(distribution=[hash[user_id]])
+      +- TableSourceScan(table=[[default_catalog, default_database, Payments, 
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
 ]]>
     </Resource>
   </TestCase>
@@ -1359,6 +1541,47 @@ Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 
_UTF-16LE'yyMMdd') AS day], cha
 +- TableSourceScan(table=[[default_catalog, default_database, src1, 
project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D])
 
 
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultipleMultiJoinHintsInDifferentBranches">
+    <Resource name="sql">
+      <![CDATA[SELECT left_side.user_id, left_side.order_id, 
right_side.payment_id, right_side.location FROM (  SELECT /*+ MULTI_JOIN(u, o) 
*/ u.user_id, o.order_id, o.user_id ouid   FROM Users u   INNER JOIN Orders o 
ON u.user_id = o.user_id) left_side INNER JOIN (  SELECT /*+ MULTI_JOIN(p, s) 
*/ p.payment_id, p.user_id, s.location, s.user_id suid   FROM Payments p   
INNER JOIN Shipments s ON p.user_id = s.user_id) right_side ON 
left_side.user_id = right_side.user_id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(user_id=[$0], order_id=[$1], payment_id=[$3], location=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   :- LogicalProject(user_id=[$0], order_id=[$3], ouid=[$4])
+   :  +- LogicalJoin(condition=[=($0, $4)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[u, o]]]])
+   :     :- LogicalTableScan(table=[[default_catalog, default_database, 
Users]], hints=[[[ALIAS inheritPath:[] options:[u]]]])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
Orders]], hints=[[[ALIAS inheritPath:[] options:[o]]]])
+   +- LogicalProject(payment_id=[$0], user_id=[$2], location=[$3], suid=[$4])
+      +- LogicalJoin(condition=[=($2, $4)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0] options:[p, s]]]])
+         :- LogicalTableScan(table=[[default_catalog, default_database, 
Payments]], hints=[[[ALIAS inheritPath:[] options:[p]]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
Shipments]], hints=[[[ALIAS inheritPath:[] options:[s]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[user_id, order_id, payment_id, location])
++- Join(joinType=[InnerJoin], where=[=(user_id, user_id0)], select=[user_id, 
order_id, payment_id, user_id0, location], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- Calc(select=[user_id, order_id])
+   :     +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER], 
inputUniqueKeys=[(user_id), (order_id)], joinConditions=[=(user_id, user_id0)], 
select=[user_id,order_id,user_id0], rowType=[RecordType(VARCHAR(2147483647) 
user_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
+   :        :- Exchange(distribution=[hash[user_id]])
+   :        :  +- ChangelogNormalize(key=[user_id])
+   :        :     +- Exchange(distribution=[hash[user_id]])
+   :        :        +- TableSourceScan(table=[[default_catalog, 
default_database, Users, project=[user_id], metadata=[]]], fields=[user_id])
+   :        +- Exchange(distribution=[hash[user_id]])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, 
Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+   +- Exchange(distribution=[hash[user_id]])
+      +- Calc(select=[payment_id, user_id, location])
+         +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER], 
inputUniqueKeys=[(payment_id), noUniqueKey], joinConditions=[=(user_id, 
user_id0)], select=[payment_id,user_id,location,user_id0], 
rowType=[RecordType(VARCHAR(2147483647) payment_id, VARCHAR(2147483647) 
user_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id0)])
+            :- Exchange(distribution=[hash[user_id]])
+            :  +- TableSourceScan(table=[[default_catalog, default_database, 
Payments, project=[payment_id, user_id], metadata=[]]], fields=[payment_id, 
user_id])
+            +- Exchange(distribution=[hash[user_id]])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
Shipments]], fields=[location, user_id])
 ]]>
     </Resource>
   </TestCase>
@@ -1987,6 +2210,33 @@ MultiJoin(commonJoinKey=[noCommonJoinKey], 
joinTypes=[INNER], inputUniqueKeys=[n
 :        +- TableSourceScan(table=[[default_catalog, default_database, Orders, 
project=[order_id], metadata=[]]], fields=[order_id])
 +- Exchange(distribution=[single])
    +- TableSourceScan(table=[[default_catalog, default_database, Payments, 
project=[payment_id], metadata=[]]], fields=[payment_id])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testThreeWayJoinWithMultiJoinHint">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ MULTI_JOIN(u, o, p) */u.user_id, u.name, o.order_id, 
p.payment_id FROM Users u INNER JOIN Orders o ON u.user_id = o.user_id LEFT 
JOIN Payments p ON u.user_id = p.user_id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[=($0, $8)], joinType=[left], joinHints=[[[MULTI_JOIN 
inheritPath:[0] options:[u, o, p]]]])
+   :- LogicalJoin(condition=[=($0, $4)], joinType=[inner], 
joinHints=[[[MULTI_JOIN inheritPath:[0, 0] options:[u, o, p]]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, Users]], 
hints=[[[ALIAS inheritPath:[] options:[u]]]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]], 
hints=[[[ALIAS inheritPath:[] options:[o]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, Payments]], 
hints=[[[ALIAS inheritPath:[] options:[p]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[user_id, name, order_id, payment_id])
++- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[(user_id), (order_id), (payment_id)], 
joinConditions=[=(user_id, user_id0), =(user_id, user_id1)], 
select=[user_id,name,order_id,user_id0,payment_id,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
payment_id, VARCHAR(2147483647) user_id1)])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id, name], metadata=[]]], fields=[user_id, name])
+   :- Exchange(distribution=[hash[user_id]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, Orders, 
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
+   +- Exchange(distribution=[hash[user_id]])
+      +- TableSourceScan(table=[[default_catalog, default_database, Payments, 
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
 ]]>
     </Resource>
   </TestCase>

Reply via email to