> the change might not be supposed for the downstream of the job which requires > details of changelog
Could you elaborate on this a bit? I've never met such kinds of requirements before, I'm curious what is the scenario that requires this. shuai xu <xushuai...@gmail.com> 于2024年1月11日周四 13:08写道: > > Thanks for your response, Benchao. > > Here is my thought on the newly added option. > Users' current jobs are running on a version without minibatch join. If the > existing option to enable minibatch join is utilized, then when users' jobs > are migrated to the new version, the internal behavior of the join operation > within the jobs will change. Although the semantic of changelog emitted by > the Join operator is eventual consistency, the change might not be supposed > for the downstream of the job which requires details of changelog. This newly > added option also refers to > 'table.exec.deduplicate.mini-batch.compact-changes-enabled'. > > As for the implementation,The new operator shares the state of the original > operator and it merely has an additional minibatch for storing records to do > some optimization. The storage remains consistent, and there is minor > modification to the computational logic. > > Best, > Xu Shuai > > > 2024年1月10日 22:56,Benchao Li <libenc...@apache.org> 写道: > > > > Thanks shuai for driving this, mini-batch Join is a very useful > > optimization, +1 for the general idea. > > > > Regarding the configuration > > "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really > > necessary. The semantic of changelog emitted by the Join operator is > > eventual consistency, so there is no much difference between original > > Join and mini-batch Join from this aspect. Besides, introducing more > > options would make it more complex for users, harder to understand and > > maintain, which we should be careful about. > > > > One thing about the implementation, could you make the new operator > > share the same state definition with the original one? > > > > shuai xu <xushuai...@gmail.com> 于2024年1月10日周三 21:23写道: > >> > >> Hi devs, > >> > >> I’d like to start a discussion on FLIP-415: Introduce a new join operator > >> to support minibatch[1]. > >> > >> Currently, when performing cascading connections in Flink, there is a pain > >> point of record amplification. Every record join operator receives would > >> trigger join process. However, if records of +I and -D matches , they > >> could be folded to reduce two times of join process. Besides, records of > >> -U +U might output 4 records in which two records are redundant when > >> encountering outer join . > >> > >> To address this issue, this FLIP introduces a new > >> MiniBatchStreamingJoinOperator to achieve batch processing which could > >> reduce number of outputting redundant messages and avoid unnecessary join > >> processes. > >> A new option is added to control the operator to avoid influencing > >> existing jobs. > >> > >> Please find more details in the FLIP wiki document [1]. Looking > >> forward to your feedback. > >> > >> [1] > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch > >> > >> Best, > >> Xu Shuai > > > > > > > > -- > > > > Best, > > Benchao Li > -- Best, Benchao Li