This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new 6c91d65fb0c [FLINK-38073][docs] Add documentation for the MultiJoin 
operator (#26775)
6c91d65fb0c is described below

commit 6c91d65fb0cfad5b197941566eac0a0b85eebc5d
Author: Gustavo de Morais <gdemor...@confluent.io>
AuthorDate: Thu Jul 17 02:58:11 2025 +0100

    [FLINK-38073][docs] Add documentation for the MultiJoin operator (#26775)
    
    * [FLINK-38073][docs] Add documentation for the MultiJoin operator
    
    * [FLINK-38073][docs] Complement with TaskManager/JobManager/Host specs
    
    (cherry picked from commit a7aabb8c8a386580f0be3e4681971e834d53523c)
---
 docs/content/docs/dev/table/sql/queries/joins.md   |   3 +
 docs/content/docs/dev/table/tuning.md              |  66 +++++++++++++++++++++
 .../fig/table-streaming/multijoin_operator.png     | Bin 0 -> 812521 bytes
 3 files changed, 69 insertions(+)

diff --git a/docs/content/docs/dev/table/sql/queries/joins.md 
b/docs/content/docs/dev/table/sql/queries/joins.md
index 7a89da0d35b..b0b5471000b 100644
--- a/docs/content/docs/dev/table/sql/queries/joins.md
+++ b/docs/content/docs/dev/table/sql/queries/joins.md
@@ -83,6 +83,9 @@ FULL OUTER JOIN Product
 ON Orders.product_id = Product.id
 ```
 
+### Multiple Regular Joins
+If you are facing issues with the performance of your multiple chained joins 
and these generate a lot of state, consider using the `MultiJoin` operator. See 
[tuning multiple regular joins]({{< ref "docs/dev/table/tuning" 
>}}#multiple-regular-joins) for details.
+
 Interval Joins
 --------------
 
diff --git a/docs/content/docs/dev/table/tuning.md 
b/docs/content/docs/dev/table/tuning.md
index d6c6c2e9f2c..ae372f08296 100644
--- a/docs/content/docs/dev/table/tuning.md
+++ b/docs/content/docs/dev/table/tuning.md
@@ -302,3 +302,69 @@ The execution of mini-batch join operator are as shown in 
the figure below.
 MiniBatch optimization is disabled by default for regular join. In order to 
enable this optimization, you should set options 
`table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and 
`table.exec.mini-batch.size`. Please see [configuration]({{< ref 
"docs/dev/table/config" >}}#execution-options) page for more details.
 
 {{< top >}}
+
+## Multiple Regular Joins
+
+{{< label Streaming >}}
+
+Streaming Flink jobs with multiple non-temporal regular joins often experience 
operational instability and performance degradation due to large state sizes. 
This is often because the intermediate state created by a chain of joins is 
much larger than the input state itself. In Flink 2.1, we introduce a new 
multi-join operator, an optimization designed to significantly reduce state 
size and improve performance for join pipelines that involve record 
amplification and large intermediate stat [...]
+
+In most joins, a significant portion of processing time is spent fetching 
records from the state. The efficiency of the MultiJoin operator largely 
depends on the size of this intermediate state. In a common scenario where a 
pipeline experiences record amplification—meaning each join produces more data 
and records than the previous one, the MultiJoin operator is more efficient. 
This is because it keeps the state on which the operator interacts much 
smaller, leading to a more stable operat [...]
+
+### The MultiJoin Operator
+The main benefits of the MultiJoin operator are:
+
+1) Considerably smaller state size due to zero intermediate state.
+2) Improved performance for chained joins with record amplification.
+3) Improved stability: linear state growth with amount of records processed, 
instead of polynomial growth with binary joins. 
+
+Also, pipelines with MultiJoin instead of binary joins usually have faster 
initialization and recovery times due to smaller state and fewer amount of 
nodes.
+
+### When to enable the MultiJoin?
+
+If your job has multiple joins that share at least one common join key, and 
you observe that the intermediate state in the intermediate joins is larger 
than the inputs sources, consider enabling the MultiJoin operator.
+
+### How to enable the MultiJoin?
+
+To enable this optimization, set the following configuration
+
+```sql
+SET 'table.optimizer.multi-join.enabled' = 'true';
+```
+
+Important: This is currently in an experimental state - there are open 
optimizations and breaking changes might be implemented in this version. We 
currently support only streaming INNER/LEFT joins. Support for RIGHT joins will 
be added soon. Due to records partitioning, you need at least one key that is 
shared between the join conditions, see:
+
+- Supported: A JOIN B ON A.key = B.key JOIN C ON A.key = C.key (Partition by 
key)
+- Supported: A JOIN B ON A.key = B.key JOIN C ON B.key = C.key (Partition by 
key via transitivity)
+- Not supported: A JOIN B ON A.key1 = B.key1 JOIN C ON B.key2 = C.key2 (No 
single key allows partitioning A, B, and C together in a single operator. This 
will be split into multiple MultiJoin operators)
+
+### MultiJoin Operator Example - Benchmark
+
+Here's a 10-way benchmark between the default binary joins and the MultiJoin 
operator. You can observe the amount of intermediate state in the first 
section, the amount of records processed when the operators reach 100% busyness 
in the second section, and the checkpoints in the third.
+
+{{< img src="/fig/table-streaming/multijoin_operator.png" height="100%" >}}
+
+For this 10-way join above, involving record amplification, we've observed 
significant improvements. Here are some rough numbers:
+
+- Performance: 2x to over 100x+ increase in processed records when both at 
100% busyness.
+- State Size: 3x to over 1000x+ smaller as intermediate state grows.
+
+The total state is always smaller with the MultiJoin operator. In this case, 
the performance is initially the same, but as the intermediate state grows, the 
performance of binary joins degrade and the multi join remains stable and 
outperforms.
+
+This general benchmark for the 10-way join was run with the following 
configuration: 10 upsert kafka topics, 10 parallelism, 1 record per second per 
topic. We used rocksdb with unaligned checkpoints and with incremental 
checkpoints. Each job ran in one TaskManager containing 8GB process memory, 1GB 
off-heap memory and 20% network memory. The JobManager had 4GB process memory. 
The host machine contained a M1 processor chip, 32GB RAM and 1TB SSD. The sink 
uses a blackhole connector so we o [...]
+
+```sql
+INSERT INTO JoinResultsMJ
+SELECT *all fields*
+FROM TenantKafka t
+         LEFT JOIN SuppliersKafka s ON t.tenant_id = s.tenant_id AND ...
+         LEFT JOIN ProductsKafka p ON t.tenant_id = p.tenant_id AND ...
+         LEFT JOIN CategoriesKafka c ON t.tenant_id = c.tenant_id AND ...
+         LEFT JOIN OrdersKafka o ON t.tenant_id = o.tenant_id AND ...
+         LEFT JOIN CustomersKafka cust ON t.tenant_id = cust.tenant_id AND ...
+         LEFT JOIN WarehousesKafka w ON t.tenant_id = w.tenant_id AND ...
+         LEFT JOIN ShippingKafka sh ON t.tenant_id = sh.tenant_id AND ...
+         LEFT JOIN PaymentKafka pay ON t.tenant_id = pay.tenant_id AND ...
+         LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
+```
+
diff --git a/docs/static/fig/table-streaming/multijoin_operator.png 
b/docs/static/fig/table-streaming/multijoin_operator.png
new file mode 100644
index 00000000000..9e471665fe6
Binary files /dev/null and 
b/docs/static/fig/table-streaming/multijoin_operator.png differ

Reply via email to