capistrant commented on code in PR #19426:
URL: https://github.com/apache/druid/pull/19426#discussion_r3228008383


##########
multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartProcessingBuffersProvider.java:
##########
@@ -67,27 +68,55 @@ public ResourceHolder<ProcessingBuffersSet> acquire(final 
int poolSize, final lo
     final ReferenceCountingResourceHolder<ByteBuffer> bufferHolder = 
batch.get(0);
     try {
       final ByteBuffer buffer = bufferHolder.get().duplicate();
-      final int sliceSize = buffer.capacity() / poolSize / processingThreads;
-      final List<ProcessingBuffers> pool = new ArrayList<>(poolSize);
+      final int chunkSize = buffer.capacity() / poolSize;
+      final List<ProcessingBuffersSet.Slot> slots = new ArrayList<>(poolSize);
 
       for (int i = 0; i < poolSize; i++) {
-        final BlockingQueue<ByteBuffer> queue = new 
ArrayBlockingQueue<>(processingThreads);
-        for (int j = 0; j < processingThreads; j++) {
-          final int sliceNum = i * processingThreads + j;
-          buffer.position(sliceSize * sliceNum).limit(sliceSize * (sliceNum + 
1));
-          queue.add(buffer.slice());
-        }
-        final ProcessingBuffers buffers = new ProcessingBuffers(
-            new QueueNonBlockingPool<>(queue),
-            new Bouncer(processingThreads)
-        );
-        pool.add(buffers);
+        buffer.position(chunkSize * i).limit(chunkSize * (i + 1));
+        slots.add(new LazySlot(buffer.slice(), processingThreads));
       }
 
-      return new ReferenceCountingResourceHolder<>(new 
ProcessingBuffersSet(pool), bufferHolder);
+      return new ReferenceCountingResourceHolder<>(new 
ProcessingBuffersSet(slots), bufferHolder);
     }
     catch (Throwable e) {
       throw CloseableUtils.closeAndWrapInCatch(e, bufferHolder);
     }
   }
+
+  /**
+   * Lazy slot that holds one chunk of the shared merge buffer and slices it 
on demand to match the stage's
+   * actual concurrent-processor count.
+   */
+  static final class LazySlot implements ProcessingBuffersSet.Slot
+  {
+    private final ByteBuffer chunk;
+    private final int maxSlices;
+
+    LazySlot(final ByteBuffer chunk, final int maxSlices)
+    {
+      this.chunk = chunk;
+      this.maxSlices = maxSlices;
+    }
+
+    @Override
+    public ProcessingBuffers acquire(final int requestedSlices)
+    {
+      if (requestedSlices > maxSlices) {
+        throw DruidException.defensive(
+            "requestedSlices[%d] too large for maxSlices[%d]",
+            requestedSlices,
+            maxSlices
+        );
+      }
+
+      final int sliceSize = chunk.capacity() / requestedSlices;

Review Comment:
   defensive check that requested slices > 0 or do we trust the upstream 
callers always request at least one slice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to