pnowojski commented on PR #26051:
URL: https://github.com/apache/flink/pull/26051#issuecomment-2637506032

   @Zakelly / @xuyangzhong  I'm struggling with some failures in tests. 
`DeduplicateITCase#testFirstRowOnRowtime` fails for mini batch in a very weird 
way.
   ```
   [ERROR]   Run 3: DeduplicateITCase.testFirstRowOnRowtime:232 
   expected: List(
   +I(1,1,Hi,1970-01-01T00:00:00.001), 
   +I(2,3,I am fine.,1970-01-01T00:00:00.003), 
   +I(3,5,Comment#2,1970-01-01T00:00:00.005), 
   +I(4,4,Comment#3,1970-01-01T00:00:00.004), 
   +U(3,4,Comment#2,1970-01-01T00:00:00.004), 
   -U(3,5,Comment#2,1970-01-01T00:00:00.005)) # <<<<<<<<<<< this is missing
    but was: ArrayBuffer(
   +I(1,1,Hi,1970-01-01T00:00:00.001), 
   +I(2,3,I am fine.,1970-01-01T00:00:00.003), 
   +I(3,5,Comment#2,1970-01-01T00:00:00.005), 
   +I(4,4,Comment#3,1970-01-01T00:00:00.004), 
   +U(3,4,Comment#2,1970-01-01T00:00:00.004))
    ```
   The issue is that in the test with mini batch enabled, 
`StreamExecDeduplicate` ends up with 
`StreamExecDeduplicate#generateUpdateBefore` set to `false`.
   
   I've tracked the problem down to some inconsistency in the configuration. In 
`FlinkChangelogModeInferenceProgram` I'm doing a check if the deduplication can 
be executed as insert only (line 238 
https://github.com/apache/flink/pull/26051/files#diff-ddf95eb3949ab889e8e3bccdacb9f57553aac06657574fd6316ca17dd48321a1R238
 ). One of the conditions is that mini batch must be disabled. 
   
   But the problem is that the value of `table.exec.mini-batch.enabled` is 
overwritten and inconsistent in the `TableConfig`:
   
![2025_02_05_0tv_Kleki](https://github.com/user-attachments/assets/9a853c9b-7f85-4673-9cd9-a6cfad4c79ec)
   note `configuration` and `rootConfiguration` having different values. 
   
   I've tracked it down to this code: 
https://github.com/apache/flink/blob/b1544e4e513d2b75b350c20dbb1c17a8232c22fd/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala#L106
   
   introduced here: https://github.com/apache/flink/pull/23470
   
   Because of that, during planning we encounter different configuration value 
for the `mini-batch.enabled` vs during execution. So the planner code decides 
to use `insert-only` code path, which doesn't requires `generateUpdateBefore` 
but but during runtime it's still using mini batch versions of the operators.
   
   First of all it seems to me like this code in 
`StreamCommonSubGraphBasedOptimizer` is bugged, isn't it?
   Secondly, that raised a question for me. Is it expected that a compiled plan 
created with mini-batch DISABLED, can it be later executed with mini-batch 
ENABLED? Or the other way?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to