Github user xccui commented on the issue:
https://github.com/apache/flink/pull/5094
Hi @hequn8128, let me try to explain this.
1. In current implementation, the join process just relies on the cached
rows instead of the watermarks. Specifically, when receiving a record, the join
function will only check whether there exist qualified rows of the opposite
cache in spite of the lateness. Thus if the cache ***is not cleaned up in
time***, the outdated results will be emitted.
2. Strictly speaking, the value for holding back watermarks should be
dynamically reported by the join function in runtime. The current
implementation temporarily uses a static value (`MaxOutputDelay`) for that. In
other words, the holding back value should be decided by the cached rows,
rather than, the cache size should be decided by `MaxOutputDelay`.
Hope that helps.
Best, Xingcan
---