I see. Thanks for explaining.

It still seems like the `enforceProcessing` variable isn't strictly necessary, 
it just saves calls to `partitionGroup.allPartitionsBuffered()`, 
`partitionGroup.numBuffered()`, and the comparison `now - 
lastEnforcedProcessingTime > maxTaskIdleMs`. These are all just cached field 
lookups, though, so I don't know if the performance boost is worth the 
algorithmic complexity.

Regarding `lastEnforcedProcessingTime`, consider this scenario.
```
maxTaskIdleMs := 2
lastEnforcedProcessingTime := 0 (init)
now=0; allPartitionsBuffered=true => isProcessable:=true
now=1; allPartitionsBuffered=true => isProcessable:=true
now=2; allPartitionsBuffered=true => isProcessable:=true
now=3; allPartitionsBuffered=false => 
  // 3 - 0 > 2, so we force processing
  lastEnforcedProcessingTime:=3
  isProcessable:=true
```

Two things to note here:
1. the expression should probably be `now - lastEnforcedProcessingTime >= 
maxTaskIdleMs` (with `>=` instead of `>`), otherwise you'll wait at least one 
extra ms _beyond_ the purported "max task idle time".
2. In the scenario above, we said we want to wait *2 ms* before forcing 
processing, but we actually force processing *immediately*. To fix this, we 
should be comparing against `lastProcessingTime`, which we should set every 
time we process.

[ Full content available at: https://github.com/apache/kafka/pull/5428 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to