kgyrtkirk commented on code in PR #15420: URL: https://github.com/apache/druid/pull/15420#discussion_r1407604341
########## processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java: ########## @@ -127,17 +123,32 @@ public GroupingEngine( * * @return broker resource */ - public GroupByQueryResources prepareResource(GroupByQuery query) + public static GroupByQueryResources prepareResource( + GroupByQuery query, + BlockingPool<ByteBuffer> mergeBufferPool, + boolean usesGroupByMergingQueryRunner, + GroupByQueryConfig groupByQueryConfig + ) { - final int requiredMergeBufferNum = GroupByQueryResources.countRequiredMergeBufferNum(query); + + final int requiredMergeBufferNumForToolchestMerge = + GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(query); + + final int requiredMergeBufferNumForMergingQueryRunner = + usesGroupByMergingQueryRunner + ? GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(groupByQueryConfig, query) + : 0; Review Comment: I find it a pretty odd contract that it gets counted here and used somewhere I think decoupling things like this could increase complexity significantly ########## processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java: ########## @@ -311,39 +310,26 @@ public void cleanup(CloseableGrouperIterator<RowBasedKey, ResultRow> iterFromMak private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder( int numBuffers, - boolean hasTimeout, - long timeoutAt + ResponseContext responseContext ) { - try { - if (numBuffers > mergeBufferPool.maxSize()) { - throw new ResourceLimitExceededException( - "Query needs " + numBuffers + " merge buffers, but only " - + mergeBufferPool.maxSize() + " merge buffers were configured. " - + "Try raising druid.processing.numMergeBuffers." - ); - } - final List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolder; - // This will potentially block if there are no merge buffers left in the pool. - if (hasTimeout) { - final long timeout = timeoutAt - System.currentTimeMillis(); - if (timeout <= 0) { - throw new QueryTimeoutException(); - } - if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) { - throw new QueryTimeoutException("Cannot acquire enough merge buffers"); - } - } else { - mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers); - } - return mergeBufferHolder; + GroupByQueryResources resource = (GroupByQueryResources) responseContext.get(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS); Review Comment: seems like `responseContext` is used as a backplane here... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org