Hello folks, We have a use case where we have a few stream-stream joins, requiring us to join a very large table with a much smaller table. Given the nature of the dataset, if we use a typical join that uses Hash distribution to co-locate the records for each join key, we end up with a very skewed join (a few task slots getting all of the work, as against a good distribution).
We’ve internally implemented a Salting based solution where we salt the smaller table and join it with the larger table. While this works in the POC stage, we’d like to leverage flink as much as possible to do such a join. By the nature of the problem, a broadcast join seems theoretically helpful. We’ve done an exploration on query hints supported in Flink, starting with this FLIP <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job> and this FLIP <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join> . Currently, the Optimizer doesn't consider the Broadcast hint in the `Exchange` step of the join, when creating the physical plan. Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint parsed from the query: ```sql ... ... joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla]]]]) ... ``` However, the Flink optimizer ignores the hint and still represents the join as a regular `hash` join in the `Exchange` step: ```sql ... ... :- Exchange(distribution=[hash[shop_id, publication_id, price_list_id]]) ... ``` In Flink `StreamExecExchange`, the translation happens only via the `HASH` distribution type <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>. unlike in the Flink `BatchExecExchange`, the translation can happen via a multitude of options <https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194> (`HASH/BROADCAST`). Quoting this Flink mailing list discussion <https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> for the FLIP that implemented the Broadcast join hint for batch sql: > But currently, only in batch the optimizer has different Join strategies for Join and there is no choice of join strategies in the stream. The join hints listed in the current flip should be ignored (maybe can be warned) in streaming mode. When in the future the stream mode has the choice of join strategies, I think that's a good time to discuss that the join hint can affect the streaming SQL. What do you folks think about the possibility of a Broadcast hint for Streaming Sql, that lets the user choose the kind of distribution they’d want with the dataset ? Happy to learn more about this and hopefully implement it, if it doesn’t sound like a terrible idea. Thanks, Prabhjot
