lincoln-lil commented on code in PR #27191:
URL: https://github.com/apache/flink/pull/27191#discussion_r2503407176


##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -368,3 +368,52 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In stream jobs, regular joins store all historical data from both sides of the 
input to ensure the accuracy of the computation results. When an input record 
is received, regular joins query the state of the other side to find matching 
records to output, while simultaneously updating its own state.
+However, as the job runs for a long term and the input data increases, the 
state of regular joins will gradually grow larger. This may lead to 
computational resources becoming a bottleneck, impacting the overall 
performance of the job and potentially causing a series of stability issues.
+
+To address this, we have introduced a new delta join operator. The core idea 
is to leverage a bidirectional lookup join approach to reuse data from source 
tables, replacing the state required by regular joins. Compared to traditional 
regular joins, delta joins significantly reduce the state size, improve the 
stability of the job, and also decrease the demand for computational resources.

Review Comment:
   "To mitigate these challenges, Flink introduces the delta join operator. The 
key idea is to replace the large state maintained by regular joins with a 
bidirectional lookup-based join that directly reuses data from the source 
tables. Compared to traditional regular joins, Delta Join substantially reduces 
state size, enhances job stability, and lowers overall resource consumption." 
   
   or a shorter verison:
   "The delta join operator addresses this by using a bidirectional lookup 
approach that reuses data from the source tables instead of storing it in 
state. This design greatly reduces state size, improves performance stability, 
and minimizes resource consumption." 



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -368,3 +368,52 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In stream jobs, regular joins store all historical data from both sides of the 
input to ensure the accuracy of the computation results. When an input record 
is received, regular joins query the state of the other side to find matching 
records to output, while simultaneously updating its own state.
+However, as the job runs for a long term and the input data increases, the 
state of regular joins will gradually grow larger. This may lead to 
computational resources becoming a bottleneck, impacting the overall 
performance of the job and potentially causing a series of stability issues.
+
+To address this, we have introduced a new delta join operator. The core idea 
is to leverage a bidirectional lookup join approach to reuse data from source 
tables, replacing the state required by regular joins. Compared to traditional 
regular joins, delta joins significantly reduce the state size, improve the 
stability of the job, and also decrease the demand for computational resources.
+
+This feature is currently enabled by default and regular join will be 
optimized into delta join when the following conditions are simultaneously met:
+
+1. The sql pattern satisfies the optimization criteria. For details, please 
refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" 
>}}#supported-features-and-limitations)
+2. The external storage system of the source table provides index information 
for fast querying for delta joins. Currently, [Apache 
Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has 
provided index information at the table level for Flink, allowing such tables 
to be used as source tables for delta joins. Please refer to the [Fluss 
documentation](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)
 for more details.
+
+### Working Principle
+
+In Flink, regular joins require completely storing incoming data from both 
sides in the state, matching that data when the opposite side's data arrives. 
In contrast, delta joins utilize the indexing capabilities provided by external 
storage systems to convert the behavior of querying state data into efficient 
queries against data in the external storage system using index keys. This 
approach avoids the need for duplicate storage of the same data in both the 
external storage system and the state.
+
+{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
+
+### Important Configurations
+
+Currently, the optimization for delta joins is enabled by default. You can 
disable this feature manually by setting the following configuration. Please 
see [Configuration]({{< ref "docs/dev/table/config" >}}#optimizer-options) page 
for more details.

Review Comment:
   nit: "Delta join optimization is enabled by default.
   You can disable it manually using the following configuration option:
   ```sql
   SET 'table.optimizer.delta-join.strategy' = 'NONE';
   ```
   Please see ...
   "



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -368,3 +368,52 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In stream jobs, regular joins store all historical data from both sides of the 
input to ensure the accuracy of the computation results. When an input record 
is received, regular joins query the state of the other side to find matching 
records to output, while simultaneously updating its own state.
+However, as the job runs for a long term and the input data increases, the 
state of regular joins will gradually grow larger. This may lead to 
computational resources becoming a bottleneck, impacting the overall 
performance of the job and potentially causing a series of stability issues.
+
+To address this, we have introduced a new delta join operator. The core idea 
is to leverage a bidirectional lookup join approach to reuse data from source 
tables, replacing the state required by regular joins. Compared to traditional 
regular joins, delta joins significantly reduce the state size, improve the 
stability of the job, and also decrease the demand for computational resources.
+
+This feature is currently enabled by default and regular join will be 
optimized into delta join when the following conditions are simultaneously met:

Review Comment:
   "This feature is enabled by default. A regular join will be automatically 
optimized into a delta join when all of the following conditions are met:"



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -368,3 +368,52 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In stream jobs, regular joins store all historical data from both sides of the 
input to ensure the accuracy of the computation results. When an input record 
is received, regular joins query the state of the other side to find matching 
records to output, while simultaneously updating its own state.
+However, as the job runs for a long term and the input data increases, the 
state of regular joins will gradually grow larger. This may lead to 
computational resources becoming a bottleneck, impacting the overall 
performance of the job and potentially causing a series of stability issues.

Review Comment:
   Nit: You might consider merging the first two paragraphs into a single, 
shorter one for clarity, e.g., "In streaming jobs, regular joins keep all 
historical data from both inputs to ensure accuracy. Over time, this causes the 
state to grow continuously, increasing resource usage and impacting stability."



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -368,3 +368,52 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In stream jobs, regular joins store all historical data from both sides of the 
input to ensure the accuracy of the computation results. When an input record 
is received, regular joins query the state of the other side to find matching 
records to output, while simultaneously updating its own state.
+However, as the job runs for a long term and the input data increases, the 
state of regular joins will gradually grow larger. This may lead to 
computational resources becoming a bottleneck, impacting the overall 
performance of the job and potentially causing a series of stability issues.
+
+To address this, we have introduced a new delta join operator. The core idea 
is to leverage a bidirectional lookup join approach to reuse data from source 
tables, replacing the state required by regular joins. Compared to traditional 
regular joins, delta joins significantly reduce the state size, improve the 
stability of the job, and also decrease the demand for computational resources.
+
+This feature is currently enabled by default and regular join will be 
optimized into delta join when the following conditions are simultaneously met:
+
+1. The sql pattern satisfies the optimization criteria. For details, please 
refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" 
>}}#supported-features-and-limitations)
+2. The external storage system of the source table provides index information 
for fast querying for delta joins. Currently, [Apache 
Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has 
provided index information at the table level for Flink, allowing such tables 
to be used as source tables for delta joins. Please refer to the [Fluss 
documentation](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)
 for more details.
+
+### Working Principle
+
+In Flink, regular joins require completely storing incoming data from both 
sides in the state, matching that data when the opposite side's data arrives. 
In contrast, delta joins utilize the indexing capabilities provided by external 
storage systems to convert the behavior of querying state data into efficient 
queries against data in the external storage system using index keys. This 
approach avoids the need for duplicate storage of the same data in both the 
external storage system and the state.

Review Comment:
   Maybe we can split into two paragraphs here, e.g., "In Flink, regular joins 
store all incoming records from both input sides in the state to ensure that 
future records can be matched correctly when corresponding data arrives from 
the opposite side.
   
   In contrast, delta joins leverage the indexing capabilities of external 
storage systems. Instead of performing state lookups, delta joins issue 
efficient index-based queries directly against the external storage to retrieve 
matching records. This approach eliminates redundant data storage between the 
Flink state and the external system." 



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -368,3 +368,52 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In stream jobs, regular joins store all historical data from both sides of the 
input to ensure the accuracy of the computation results. When an input record 
is received, regular joins query the state of the other side to find matching 
records to output, while simultaneously updating its own state.

Review Comment:
   How about "In streaming jobs, regular joins maintain the complete historical 
data from both input streams to ensure accurate computation results. When a new 
record arrives, the operator queries the state of the opposite side state to 
find matching records for output and then updates its own state accordingly." ?



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -368,3 +368,52 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In stream jobs, regular joins store all historical data from both sides of the 
input to ensure the accuracy of the computation results. When an input record 
is received, regular joins query the state of the other side to find matching 
records to output, while simultaneously updating its own state.
+However, as the job runs for a long term and the input data increases, the 
state of regular joins will gradually grow larger. This may lead to 
computational resources becoming a bottleneck, impacting the overall 
performance of the job and potentially causing a series of stability issues.
+
+To address this, we have introduced a new delta join operator. The core idea 
is to leverage a bidirectional lookup join approach to reuse data from source 
tables, replacing the state required by regular joins. Compared to traditional 
regular joins, delta joins significantly reduce the state size, improve the 
stability of the job, and also decrease the demand for computational resources.
+
+This feature is currently enabled by default and regular join will be 
optimized into delta join when the following conditions are simultaneously met:
+
+1. The sql pattern satisfies the optimization criteria. For details, please 
refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" 
>}}#supported-features-and-limitations)
+2. The external storage system of the source table provides index information 
for fast querying for delta joins. Currently, [Apache 
Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has 
provided index information at the table level for Flink, allowing such tables 
to be used as source tables for delta joins. Please refer to the [Fluss 
documentation](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)
 for more details.
+
+### Working Principle
+
+In Flink, regular joins require completely storing incoming data from both 
sides in the state, matching that data when the opposite side's data arrives. 
In contrast, delta joins utilize the indexing capabilities provided by external 
storage systems to convert the behavior of querying state data into efficient 
queries against data in the external storage system using index keys. This 
approach avoids the need for duplicate storage of the same data in both the 
external storage system and the state.
+
+{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
+
+### Important Configurations
+
+Currently, the optimization for delta joins is enabled by default. You can 
disable this feature manually by setting the following configuration. Please 
see [Configuration]({{< ref "docs/dev/table/config" >}}#optimizer-options) page 
for more details.
+
+```sql
+SET 'table.optimizer.delta-join.strategy' = 'NONE';
+```
+
+Additionally, you can adjust the performance of delta joins by configuring the 
following configurations. Please see [Configuration]({{< ref 
"docs/dev/table/config" >}}#execution-options) page for more details.

Review Comment:
   nit: "To fine-tune the performance of delta joins, you can also configure 
the following parameters.
   See ..."



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -368,3 +368,52 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In stream jobs, regular joins store all historical data from both sides of the 
input to ensure the accuracy of the computation results. When an input record 
is received, regular joins query the state of the other side to find matching 
records to output, while simultaneously updating its own state.
+However, as the job runs for a long term and the input data increases, the 
state of regular joins will gradually grow larger. This may lead to 
computational resources becoming a bottleneck, impacting the overall 
performance of the job and potentially causing a series of stability issues.
+
+To address this, we have introduced a new delta join operator. The core idea 
is to leverage a bidirectional lookup join approach to reuse data from source 
tables, replacing the state required by regular joins. Compared to traditional 
regular joins, delta joins significantly reduce the state size, improve the 
stability of the job, and also decrease the demand for computational resources.
+
+This feature is currently enabled by default and regular join will be 
optimized into delta join when the following conditions are simultaneously met:
+
+1. The sql pattern satisfies the optimization criteria. For details, please 
refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" 
>}}#supported-features-and-limitations)
+2. The external storage system of the source table provides index information 
for fast querying for delta joins. Currently, [Apache 
Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has 
provided index information at the table level for Flink, allowing such tables 
to be used as source tables for delta joins. Please refer to the [Fluss 
documentation](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)
 for more details.
+
+### Working Principle
+
+In Flink, regular joins require completely storing incoming data from both 
sides in the state, matching that data when the opposite side's data arrives. 
In contrast, delta joins utilize the indexing capabilities provided by external 
storage systems to convert the behavior of querying state data into efficient 
queries against data in the external storage system using index keys. This 
approach avoids the need for duplicate storage of the same data in both the 
external storage system and the state.
+
+{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
+
+### Important Configurations
+
+Currently, the optimization for delta joins is enabled by default. You can 
disable this feature manually by setting the following configuration. Please 
see [Configuration]({{< ref "docs/dev/table/config" >}}#optimizer-options) page 
for more details.
+
+```sql
+SET 'table.optimizer.delta-join.strategy' = 'NONE';
+```
+
+Additionally, you can adjust the performance of delta joins by configuring the 
following configurations. Please see [Configuration]({{< ref 
"docs/dev/table/config" >}}#execution-options) page for more details.
+
+- `table.exec.delta-join.cache-enabled`
+- `table.exec.delta-join.left.cache-size`
+- `table.exec.delta-join.right.cache-size`
+
+### Supported Features and Limitations
+
+Delta joins are continuously evolving, and supports the following features 
currently.
+
+1. Support INSERT-ONLY tables as source tables for delta join.

Review Comment:
   nit: "1. Support for **INSERT-only** tables as source tables.
   2. Support for **CDC** tables **without DELETE operations** as source tables.
   3. Support for **projection** and **filter** operations between the source 
and the delta join.
   4. Support for **caching** within the delta join operator."



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -368,3 +368,52 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In stream jobs, regular joins store all historical data from both sides of the 
input to ensure the accuracy of the computation results. When an input record 
is received, regular joins query the state of the other side to find matching 
records to output, while simultaneously updating its own state.
+However, as the job runs for a long term and the input data increases, the 
state of regular joins will gradually grow larger. This may lead to 
computational resources becoming a bottleneck, impacting the overall 
performance of the job and potentially causing a series of stability issues.
+
+To address this, we have introduced a new delta join operator. The core idea 
is to leverage a bidirectional lookup join approach to reuse data from source 
tables, replacing the state required by regular joins. Compared to traditional 
regular joins, delta joins significantly reduce the state size, improve the 
stability of the job, and also decrease the demand for computational resources.
+
+This feature is currently enabled by default and regular join will be 
optimized into delta join when the following conditions are simultaneously met:
+
+1. The sql pattern satisfies the optimization criteria. For details, please 
refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" 
>}}#supported-features-and-limitations)
+2. The external storage system of the source table provides index information 
for fast querying for delta joins. Currently, [Apache 
Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has 
provided index information at the table level for Flink, allowing such tables 
to be used as source tables for delta joins. Please refer to the [Fluss 
documentation](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)
 for more details.
+
+### Working Principle
+
+In Flink, regular joins require completely storing incoming data from both 
sides in the state, matching that data when the opposite side's data arrives. 
In contrast, delta joins utilize the indexing capabilities provided by external 
storage systems to convert the behavior of querying state data into efficient 
queries against data in the external storage system using index keys. This 
approach avoids the need for duplicate storage of the same data in both the 
external storage system and the state.
+
+{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
+
+### Important Configurations
+
+Currently, the optimization for delta joins is enabled by default. You can 
disable this feature manually by setting the following configuration. Please 
see [Configuration]({{< ref "docs/dev/table/config" >}}#optimizer-options) page 
for more details.
+
+```sql
+SET 'table.optimizer.delta-join.strategy' = 'NONE';
+```
+
+Additionally, you can adjust the performance of delta joins by configuring the 
following configurations. Please see [Configuration]({{< ref 
"docs/dev/table/config" >}}#execution-options) page for more details.
+
+- `table.exec.delta-join.cache-enabled`
+- `table.exec.delta-join.left.cache-size`
+- `table.exec.delta-join.right.cache-size`
+
+### Supported Features and Limitations
+
+Delta joins are continuously evolving, and supports the following features 
currently.
+
+1. Support INSERT-ONLY tables as source tables for delta join.
+2. Support CDC tables without DELETE as source tables for delta join.
+3. Support project and filter between source and delta join.
+4. Support cache in delta join.
+
+However, delta joins has the following limitations. Any job containing one of 
these conditions cannot be optimized into a delta join.

Review Comment:
   nit: "However, Delta Joins also have several **limitations**.
   Jobs containing any of the following conditions **cannot** be optimized into 
a delta join:
   
   1. The **index key** of the table must be included in the join’s 
**equivalence conditions**.
   2. Only **INNER JOIN** is currently supported.
   3. The **downstream operator** must be able to handle **duplicate changes**, 
such as a sink operating in **UPSERT mode** without `upsertMaterialize`.
   4. When consuming a **CDC stream**, the **join key** must be part of the 
**primary key**.
   5. When consuming a **CDC stream**, all **filters** must be applied on the 
**upsert key**.
   6. **Non-deterministic functions** are not allowed in filters or 
projections."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to