Akshat-Jain opened a new pull request, #17373:
URL: https://github.com/apache/druid/pull/17373

   ### Description
   
   With the changes made in #17038, we missed a flow where multiple frames 
could be written to the output channel in a single iteration of 
`WindowOperatorQueryFrameProcessor#runIncrementally`. This violates the 
contract of `runIncrementally()` and leads to the following error: `Channel has 
no capacity`
   
   <details>
   <summary>Sample stacktrace</summary>
   
   ```
   2024-10-18T00:06:27,410 WARN [MultiStageQuery-test-controller-client] 
org.apache.druid.msq.exec.WorkerImpl - Work failed; stage 2; task 
query-dummy-worker0_0; host 123:8080: UnknownError: java.lang.RuntimeException: 
org.apache.druid.java.util.common.ISE: Channel has no capacity
   java.lang.RuntimeException: org.apache.druid.java.util.common.ISE: Channel 
has no capacity
        at org.apache.druid.java.util.common.Either.valueOrThrow(Either.java:95)
        at 
org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:271)
        at 
org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.run(FrameProcessorExecutor.java:141)
        at org.apache.druid.msq.exec.WorkerImpl$2$2.run(WorkerImpl.java:900)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.druid.java.util.common.ISE: Channel has no capacity
        at 
org.apache.druid.frame.channel.BlockingQueueFrameChannel$Writable.write(BlockingQueueFrameChannel.java:139)
        at 
org.apache.druid.msq.indexing.CountingWritableFrameChannel.write(CountingWritableFrameChannel.java:50)
        at 
org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.flushFrameWriter(WindowOperatorQueryFrameProcessor.java:302)
        at 
org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.writeRacToFrame(WindowOperatorQueryFrameProcessor.java:262)
        at 
org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.flushAllRowsAndCols(WindowOperatorQueryFrameProcessor.java:232)
        at 
org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.runIncrementally(WindowOperatorQueryFrameProcessor.java:150)
        at 
org.apache.druid.msq.counters.CpuTimeAccumulatingFrameProcessor.runIncrementally(CpuTimeAccumulatingFrameProcessor.java:66)
        at 
org.apache.druid.frame.processor.FrameProcessors$1FrameProcessorWithBaggage.runIncrementally(FrameProcessors.java:72)
        at 
org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:239)
        ... 5 more
   ```
   
   </details>
   
   We missed this flow:
   1. We read the input channel
   2. We call `runAllOpsOnBatch()`
   3. If after step (1), the input channel was finished, then the 
operatorChain's receiver's completed() method will get called, which would call 
`flushAllRowsAndCols()`, hence writing to the output channel.
   4. After this, we end up calling `flushAllRowsAndCols()` again, missing the 
fact that there's a chance that the input channel might have finished. This 
ends up attempting to write another frame to the output channel, causing the 
`Channel has no capacity` error.
   
   This PR fixes the above problematic flow by checking if the input channel is 
finished. If it's finished, we re-run `runIncrementally()` instead of step 4.
   
   <hr>
   
   This PR has:
   
   - [x] been self-reviewed.
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to