Akshat-Jain opened a new pull request, #17373:
URL: https://github.com/apache/druid/pull/17373
### Description
With the changes made in #17038, we missed a flow where multiple frames
could be written to the output channel in a single iteration of
`WindowOperatorQueryFrameProcessor#runIncrementally`. This violates the
contract of `runIncrementally()` and leads to the following error: `Channel has
no capacity`
<details>
<summary>Sample stacktrace</summary>
```
2024-10-18T00:06:27,410 WARN [MultiStageQuery-test-controller-client]
org.apache.druid.msq.exec.WorkerImpl - Work failed; stage 2; task
query-dummy-worker0_0; host 123:8080: UnknownError: java.lang.RuntimeException:
org.apache.druid.java.util.common.ISE: Channel has no capacity
java.lang.RuntimeException: org.apache.druid.java.util.common.ISE: Channel
has no capacity
at org.apache.druid.java.util.common.Either.valueOrThrow(Either.java:95)
at
org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:271)
at
org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.run(FrameProcessorExecutor.java:141)
at org.apache.druid.msq.exec.WorkerImpl$2$2.run(WorkerImpl.java:900)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.druid.java.util.common.ISE: Channel has no capacity
at
org.apache.druid.frame.channel.BlockingQueueFrameChannel$Writable.write(BlockingQueueFrameChannel.java:139)
at
org.apache.druid.msq.indexing.CountingWritableFrameChannel.write(CountingWritableFrameChannel.java:50)
at
org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.flushFrameWriter(WindowOperatorQueryFrameProcessor.java:302)
at
org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.writeRacToFrame(WindowOperatorQueryFrameProcessor.java:262)
at
org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.flushAllRowsAndCols(WindowOperatorQueryFrameProcessor.java:232)
at
org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.runIncrementally(WindowOperatorQueryFrameProcessor.java:150)
at
org.apache.druid.msq.counters.CpuTimeAccumulatingFrameProcessor.runIncrementally(CpuTimeAccumulatingFrameProcessor.java:66)
at
org.apache.druid.frame.processor.FrameProcessors$1FrameProcessorWithBaggage.runIncrementally(FrameProcessors.java:72)
at
org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:239)
... 5 more
```
</details>
We missed this flow:
1. We read the input channel
2. We call `runAllOpsOnBatch()`
3. If after step (1), the input channel was finished, then the
operatorChain's receiver's completed() method will get called, which would call
`flushAllRowsAndCols()`, hence writing to the output channel.
4. After this, we end up calling `flushAllRowsAndCols()` again, missing the
fact that there's a chance that the input channel might have finished. This
ends up attempting to write another frame to the output channel, causing the
`Channel has no capacity` error.
This PR fixes the above problematic flow by checking if the input channel is
finished. If it's finished, we re-run `runIncrementally()` instead of step 4.
<hr>
This PR has:
- [x] been self-reviewed.
- [ ] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
- [ ] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [ ] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [ ] added integration tests.
- [ ] been tested in a test Druid cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]