I see. Thanks for explaining. It still seems like the `enforceProcessing` variable isn't strictly necessary, it just saves calls to `partitionGroup.allPartitionsBuffered()`, `partitionGroup.numBuffered()`, and the comparison `now - lastEnforcedProcessingTime > maxTaskIdleMs`. These are all just cached field lookups, though, so I don't know if the performance boost is worth the algorithmic complexity.
Regarding `lastEnforcedProcessingTime`, consider this scenario. ``` maxTaskIdleMs := 2 lastEnforcedProcessingTime := 0 (init) now=0; allPartitionsBuffered=true => isProcessable:=true now=1; allPartitionsBuffered=true => isProcessable:=true now=2; allPartitionsBuffered=true => isProcessable:=true now=3; allPartitionsBuffered=false => // 3 - 0 > 2, so we force processing lastEnforcedProcessingTime:=3 isProcessable:=true ``` Two things to note here: 1. the expression should probably be `now - lastEnforcedProcessingTime >= maxTaskIdleMs` (with `>=` instead of `>`), otherwise you'll wait at least one extra ms _beyond_ the purported "max task idle time". 2. In the scenario above, we said we want to wait *2 ms* before forcing processing, but we actually force processing *immediately*. To fix this, we should be comparing against `lastProcessingTime`, which we should set every time we process. [ Full content available at: https://github.com/apache/kafka/pull/5428 ] This message was relayed via gitbox.apache.org for [email protected]
