cryptoe commented on code in PR #13952: URL: https://github.com/apache/druid/pull/13952#discussion_r1229354540
########## server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalkerUtils.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +public class ClientQuerySegmentWalkerUtils +{ + public enum SubqueryResultLimit + { + ROW_LIMIT, Review Comment: Please add java docs here. ########## server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java: ########## @@ -613,96 +638,149 @@ private static <T, QueryType extends Query<T>> DataSource toInlineDataSource( final QueryToolChest<T, QueryType> toolChest, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, + final AtomicBoolean cannotMaterializeToFrames, final int limit, - long memoryLimit + long memoryLimit, + boolean useNestedForUnknownTypeInSubquery ) { - final int limitToUse = limit < 0 ? Integer.MAX_VALUE : limit; - boolean memoryLimitSet = memoryLimit >= 0; - - if (limitAccumulator.get() >= limitToUse) { - throw ResourceLimitExceededException.withMessage( - "Cannot issue subquery, maximum[%d] reached", - limitToUse - ); - } - - if (memoryLimitSet && memoryLimitAccumulator.get() >= memoryLimit) { - throw ResourceLimitExceededException.withMessage( - "Cannot issue subquery, maximum subquery result bytes[%d] reached", - memoryLimit - ); - } + final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; DataSource dataSource; - // Try to serialize the results into a frame only if the memory limit is set on the server or the query - if (memoryLimitSet) { - try { - Optional<Sequence<FrameSignaturePair>> framesOptional = toolChest.resultsAsFrames( + + switch (ClientQuerySegmentWalkerUtils.getLimitType(memoryLimit, cannotMaterializeToFrames.get())) { + case ROW_LIMIT: + if (limitAccumulator.get() >= rowLimitToUse) { + throw ResourceLimitExceededException.withMessage( + "Cannot issue the query, subqueries generated results beyond maximum[%d] rows", + rowLimitToUse + ); + } + dataSource = materializeResultsAsArray( query, results, - memoryLimit - memoryLimitAccumulator.get() + toolChest, + limitAccumulator, + limit ); - - if (!framesOptional.isPresent()) { - throw new ISE("The memory of the subqueries cannot be estimated correctly."); + break; + case MEMORY_LIMIT: + if (memoryLimitAccumulator.get() >= memoryLimit) { + throw ResourceLimitExceededException.withMessage( + "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes", + memoryLimit + ); } - - Sequence<FrameSignaturePair> frames = framesOptional.get(); - List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>(); - frames.forEach( - frame -> { - if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] bytes", - memoryLimit - ); - - } - if (limitAccumulator.addAndGet(frame.getFrame().numRows()) >= limitToUse) { - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] rows", - limitToUse - ); - } - frameSignaturePairs.add(frame); - } - ); - dataSource = new FramesBackedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)); - } - catch (ResourceLimitExceededException rlee) { - throw rlee; - } - catch (Exception e) { - log.info( - "Unable to write the subquery results to a frame. Results won't be accounted for in the memory " - + "calculation" + Optional<DataSource> maybeDataSource = materializeResultsAsFrames( + query, + results, + toolChest, + limitAccumulator, + memoryLimitAccumulator, + memoryLimit, + useNestedForUnknownTypeInSubquery ); - throw e; - } - } else { - final RowSignature signature = toolChest.resultArraySignature(query); - - final ArrayList<Object[]> resultList = new ArrayList<>(); - - toolChest.resultsAsArrays(query, results).accumulate( - resultList, - (acc, in) -> { - if (limitAccumulator.getAndIncrement() >= limitToUse) { - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] rows", - limitToUse - ); - } - acc.add(in); - return acc; + if (!maybeDataSource.isPresent()) { + cannotMaterializeToFrames.set(true); + // Check if the previous row limit accumulator has exceeded the memory results + if (memoryLimitAccumulator.get() >= memoryLimit) { + throw ResourceLimitExceededException.withMessage( + "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes", + memoryLimit + ); } - ); - dataSource = InlineDataSource.fromIterable(resultList, signature); + dataSource = materializeResultsAsArray( + query, + results, + toolChest, + limitAccumulator, + limit + ); + } else { + dataSource = maybeDataSource.get(); + } + break; + default: + throw new IAE("Only row based and memory based limiting is supported"); } return dataSource; } + private static <T, QueryType extends Query<T>> Optional<DataSource> materializeResultsAsFrames( + final QueryType query, + final Sequence<T> results, + final QueryToolChest<T, QueryType> toolChest, + final AtomicInteger limitAccumulator, + final AtomicLong memoryLimitAccumulator, + long memoryLimit, + boolean useNestedForUnknownTypeInSubquery + ) + { + Optional<Sequence<FrameSignaturePair>> framesOptional; + + try { + framesOptional = toolChest.resultsAsFrames( + query, + results, + new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()), + useNestedForUnknownTypeInSubquery + ); + } + catch (Exception e) { + return Optional.empty(); + } + + if (!framesOptional.isPresent()) { + return Optional.empty(); + } + + Sequence<FrameSignaturePair> frames = framesOptional.get(); + List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>(); + frames.forEach( + frame -> { + limitAccumulator.addAndGet(frame.getFrame().numRows()); + if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { + throw ResourceLimitExceededException.withMessage( + "Subquery generated results beyond maximum[%d] bytes", Review Comment: This method needs java docs. Line 730 is eating up exceptions to fall back. Lets document this. ########## server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java: ########## @@ -613,96 +638,149 @@ private static <T, QueryType extends Query<T>> DataSource toInlineDataSource( final QueryToolChest<T, QueryType> toolChest, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, + final AtomicBoolean cannotMaterializeToFrames, final int limit, - long memoryLimit + long memoryLimit, + boolean useNestedForUnknownTypeInSubquery ) { - final int limitToUse = limit < 0 ? Integer.MAX_VALUE : limit; - boolean memoryLimitSet = memoryLimit >= 0; - - if (limitAccumulator.get() >= limitToUse) { - throw ResourceLimitExceededException.withMessage( - "Cannot issue subquery, maximum[%d] reached", - limitToUse - ); - } - - if (memoryLimitSet && memoryLimitAccumulator.get() >= memoryLimit) { - throw ResourceLimitExceededException.withMessage( - "Cannot issue subquery, maximum subquery result bytes[%d] reached", - memoryLimit - ); - } + final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; DataSource dataSource; - // Try to serialize the results into a frame only if the memory limit is set on the server or the query - if (memoryLimitSet) { - try { - Optional<Sequence<FrameSignaturePair>> framesOptional = toolChest.resultsAsFrames( + + switch (ClientQuerySegmentWalkerUtils.getLimitType(memoryLimit, cannotMaterializeToFrames.get())) { + case ROW_LIMIT: + if (limitAccumulator.get() >= rowLimitToUse) { + throw ResourceLimitExceededException.withMessage( + "Cannot issue the query, subqueries generated results beyond maximum[%d] rows", + rowLimitToUse + ); + } + dataSource = materializeResultsAsArray( query, results, - memoryLimit - memoryLimitAccumulator.get() + toolChest, + limitAccumulator, + limit ); - - if (!framesOptional.isPresent()) { - throw new ISE("The memory of the subqueries cannot be estimated correctly."); + break; + case MEMORY_LIMIT: + if (memoryLimitAccumulator.get() >= memoryLimit) { + throw ResourceLimitExceededException.withMessage( + "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes", + memoryLimit + ); } - - Sequence<FrameSignaturePair> frames = framesOptional.get(); - List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>(); - frames.forEach( - frame -> { - if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] bytes", - memoryLimit - ); - - } - if (limitAccumulator.addAndGet(frame.getFrame().numRows()) >= limitToUse) { - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] rows", - limitToUse - ); - } - frameSignaturePairs.add(frame); - } - ); - dataSource = new FramesBackedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)); - } - catch (ResourceLimitExceededException rlee) { - throw rlee; - } - catch (Exception e) { - log.info( - "Unable to write the subquery results to a frame. Results won't be accounted for in the memory " - + "calculation" + Optional<DataSource> maybeDataSource = materializeResultsAsFrames( + query, + results, + toolChest, + limitAccumulator, + memoryLimitAccumulator, + memoryLimit, + useNestedForUnknownTypeInSubquery ); - throw e; - } - } else { - final RowSignature signature = toolChest.resultArraySignature(query); - - final ArrayList<Object[]> resultList = new ArrayList<>(); - - toolChest.resultsAsArrays(query, results).accumulate( - resultList, - (acc, in) -> { - if (limitAccumulator.getAndIncrement() >= limitToUse) { - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] rows", - limitToUse - ); - } - acc.add(in); - return acc; + if (!maybeDataSource.isPresent()) { + cannotMaterializeToFrames.set(true); + // Check if the previous row limit accumulator has exceeded the memory results + if (memoryLimitAccumulator.get() >= memoryLimit) { + throw ResourceLimitExceededException.withMessage( + "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes", + memoryLimit + ); } - ); - dataSource = InlineDataSource.fromIterable(resultList, signature); + dataSource = materializeResultsAsArray( + query, + results, + toolChest, + limitAccumulator, + limit + ); + } else { + dataSource = maybeDataSource.get(); + } + break; + default: + throw new IAE("Only row based and memory based limiting is supported"); } return dataSource; } + private static <T, QueryType extends Query<T>> Optional<DataSource> materializeResultsAsFrames( + final QueryType query, + final Sequence<T> results, + final QueryToolChest<T, QueryType> toolChest, + final AtomicInteger limitAccumulator, + final AtomicLong memoryLimitAccumulator, + long memoryLimit, + boolean useNestedForUnknownTypeInSubquery + ) + { + Optional<Sequence<FrameSignaturePair>> framesOptional; + + try { + framesOptional = toolChest.resultsAsFrames( + query, + results, + new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()), + useNestedForUnknownTypeInSubquery + ); + } + catch (Exception e) { + return Optional.empty(); Review Comment: Please add some debug line so that we know the exception here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
