clintropolis commented on a change in pull request #7133: Time Ordering On Scans
URL: https://github.com/apache/incubator-druid/pull/7133#discussion_r269812730
 
 

 ##########
 File path: 
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
 ##########
 @@ -68,34 +89,224 @@ public ScanQueryRunnerFactory(
   )
   {
     // in single thread and in jetty thread instead of processing thread
-    return new QueryRunner<ScanResultValue>()
-    {
-      @Override
-      public Sequence<ScanResultValue> run(
-          final QueryPlus<ScanResultValue> queryPlus, final Map<String, 
Object> responseContext
-      )
-      {
-        // Note: this variable is effective only when queryContext has a 
timeout.
-        // See the comment of CTX_TIMEOUT_AT.
-        final long timeoutAt = System.currentTimeMillis() + 
QueryContexts.getTimeout(queryPlus.getQuery());
-        responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
-        return Sequences.concat(
+    return (queryPlus, responseContext) -> {
+      ScanQuery query = (ScanQuery) queryPlus.getQuery();
+
+      // Note: this variable is effective only when queryContext has a timeout.
+      // See the comment of CTX_TIMEOUT_AT.
+      final long timeoutAt = System.currentTimeMillis() + 
QueryContexts.getTimeout(queryPlus.getQuery());
+      responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
+
+      if (query.getOrder().equals(ScanQuery.Order.NONE)) {
+        // Use normal strategy
+        Sequence<ScanResultValue> returnedRows = Sequences.concat(
             Sequences.map(
                 Sequences.simple(queryRunners),
-                new Function<QueryRunner<ScanResultValue>, 
Sequence<ScanResultValue>>()
-                {
-                  @Override
-                  public Sequence<ScanResultValue> apply(final 
QueryRunner<ScanResultValue> input)
-                  {
-                    return input.run(queryPlus, responseContext);
-                  }
-                }
+                input -> input.run(queryPlus, responseContext)
             )
         );
+        if (query.getLimit() <= Integer.MAX_VALUE) {
+          return returnedRows.limit(Math.toIntExact(query.getLimit()));
+        } else {
+          return returnedRows;
+        }
+      } else {
+        if (!(query.getQuerySegmentSpec() instanceof 
MultipleSpecificSegmentSpec)) {
+          throw new UOE("Time-ordering on scan queries is only supported for 
queries with segment specs"
+                        + "of type MultipleSpecificSegmentSpec");
+        }
+        List<SegmentDescriptor> descriptorsOrdered =
+            ((MultipleSpecificSegmentSpec) 
query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
+        List<QueryRunner<ScanResultValue>> queryRunnersOrdered = 
Lists.newArrayList(queryRunners); // Ascending time order by default
+
+        if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
+          descriptorsOrdered = Lists.reverse(descriptorsOrdered);
+          queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
+        }
+
+        if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) 
{
+          // Use priority queue strategy
+          return priorityQueueSortAndLimit(
+              Sequences.concat(Sequences.map(
+                  Sequences.simple(queryRunnersOrdered),
+                  input -> input.run(queryPlus, responseContext)
+              )),
+              query,
+              descriptorsOrdered
+          );
+        } else {
+          Preconditions.checkState(
+              descriptorsOrdered.size() == queryRunnersOrdered.size(),
+              "Number of segment descriptors does not equal number of "
+              + "query runners...something went wrong!"
+          );
+
+          // Combine the two lists of segment descriptors and query runners 
into a single list of
+          // segment descriptors - query runner pairs.  This makes it easier 
to use stream operators.
+          List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> 
descriptorsAndRunnersOrdered = new ArrayList<>();
+          for (int i = 0; i < queryRunnersOrdered.size(); i++) {
+            descriptorsAndRunnersOrdered.add(new 
Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
+          }
+
+          // Group the list of pairs by interval.  The LinkedHashMap will have 
an interval paired with a list of all the
+          // query runners for that segment
+          LinkedHashMap<Interval, List<Pair<SegmentDescriptor, 
QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
+              descriptorsAndRunnersOrdered.stream()
+                                          .collect(Collectors.groupingBy(
+                                              x -> x.lhs.getInterval(),
+                                              LinkedHashMap::new,
+                                              Collectors.toList()
+                                          ));
+
+          // Find the segment with the largest numbers of partitions.  This 
will be used to compare with the
+          // maxSegmentPartitionsOrderedInMemory limit to determine if the 
query is at risk of consuming too much memory.
+          int maxNumPartitionsInSegment =
+              partitionsGroupedByInterval.values()
+                                         .stream()
+                                         .map(x -> x.size())
+                                         
.max(Comparator.comparing(Integer::valueOf))
+                                         .get();
+
+          if (maxNumPartitionsInSegment <= 
scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) {
+            // Use n-way merge strategy
+
+            // Create a list of grouped runner lists (i.e. each 
sublist/"runner group" corresponds to an interval) ->
+            // there should be no interval overlap.  We create a list of lists 
so we can create a sequence of sequences.
+            // There's no easy way to convert a LinkedHashMap to a sequence 
because it's non-iterable.
+            List<List<QueryRunner<ScanResultValue>>> groupedRunners =
+                partitionsGroupedByInterval.entrySet()
+                                           .stream()
+                                           .map(entry -> entry.getValue()
+                                                              .stream()
+                                                              
.map(segQueryRunnerPair -> segQueryRunnerPair.rhs)
+                                                              
.collect(Collectors.toList()))
+                                           .collect(Collectors.toList());
+
+            return nWayMergeAndLimit(groupedRunners, queryPlus, 
responseContext);
+          }
+          throw new UOE(
+              "Time ordering for queries of %,d partitions per segment and a 
row limit of %,d is not supported."
+              + "  Try reducing the scope of the query to scan fewer 
partitions than the configurable limit of"
+              + " %,d partitions or lower the row limit below %,d.",
+              maxNumPartitionsInSegment,
+              query.getLimit(),
+              scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(),
+              scanQueryConfig.getMaxRowsQueuedForOrdering()
+          );
+        }
       }
     };
   }
 
+  @VisibleForTesting
+  Sequence<ScanResultValue> priorityQueueSortAndLimit(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery,
+      List<SegmentDescriptor> descriptorsOrdered
+  )
+  {
+    Comparator<ScanResultValue> priorityQComparator = new 
ScanResultValueTimestampComparator(scanQuery);
+
+    // Converting the limit from long to int could theoretically throw an 
ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which 
should be < Integer.MAX_VALUE)
+    int limit = Math.toIntExact(scanQuery.getLimit());
+
+    PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, 
priorityQComparator);
+
+    Yielder<ScanResultValue> yielder = inputSequence.toYielder(
+        null,
+        new YieldingAccumulator<ScanResultValue, ScanResultValue>()
+        {
+          @Override
+          public ScanResultValue accumulate(ScanResultValue accumulated, 
ScanResultValue in)
+          {
+            yield();
+            return in;
+          }
+        }
+    );
+    boolean doneScanning = yielder.isDone();
+    // We need to scan limit elements and anything else in the last segment
+    int numRowsScanned = 0;
+    Interval finalInterval = null;
+    while (!doneScanning) {
+      ScanResultValue next = yielder.get();
+      List<ScanResultValue> singleEventScanResultValues = 
next.toSingleEventScanResultValues();
+      for (ScanResultValue srv : singleEventScanResultValues) {
+        numRowsScanned++;
+        // Using an intermediate unbatched ScanResultValue is not that great 
memory-wise, but the column list
+        // needs to be preserved for queries using the compactedList result 
format
+        q.offer(srv);
+        if (q.size() > limit) {
+          q.poll();
+        }
+
+        // Finish scanning the interval containing the limit row
+        if (numRowsScanned > limit && finalInterval == null) {
+          long timestampOfLimitRow = 
srv.getFirstEventTimestamp(scanQuery.getResultFormat());
+          for (SegmentDescriptor descriptor : descriptorsOrdered) {
+            if (descriptor.getInterval().contains(timestampOfLimitRow)) {
+              finalInterval = descriptor.getInterval();
+            }
+          }
+          if (finalInterval == null) {
+            throw new ISE("WTH???  Row came from an unscanned interval?");
+          }
+        }
+      }
+      yielder = yielder.next(null);
+      doneScanning = yielder.isDone() ||
+                     (finalInterval != null &&
+                      
!finalInterval.contains(next.getFirstEventTimestamp(scanQuery.getResultFormat())));
+    }
+    // Need to convert to a Deque because Priority Queue's iterator doesn't 
guarantee that the sorted order
+    // will be maintained.  Deque was chosen over list because its addFirst is 
O(1).
+    final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size());
+    while (q.size() != 0) {
+      // addFirst is used since PriorityQueue#poll() dequeues the low-priority 
(timestamp-wise) events first.
+      sortedElements.addFirst(q.poll());
+    }
+    return Sequences.simple(sortedElements);
+  }
+
+  @VisibleForTesting
+  Sequence<ScanResultValue> nWayMergeAndLimit(
+      List<List<QueryRunner<ScanResultValue>>> groupedRunners,
+      QueryPlus<ScanResultValue> queryPlus,
+      Map<String, Object> responseContext
+  )
+  {
+    // Starting from the innermost Sequences.map:
+    // (1) Deaggregate each ScanResultValue returned by the query runners
+    // (2) Combine the deaggregated ScanResultValues into a single sequence
+    // (3) Create a sequence of results from each runner in the group and 
flatmerge based on timestamp
+    // (4) Create a sequence of results from each runner group
+    // (5) Join all the results into a single sequence
+
+    return Sequences.concat(
+        Sequences.map(
+            Sequences.simple(groupedRunners),
+            runnerGroup ->
+                Sequences.map(
+                    Sequences.simple(runnerGroup),
+                    (input) -> Sequences.concat(
+                        Sequences.map(
+                            input.run(queryPlus, responseContext),
+                            srv -> 
Sequences.simple(srv.toSingleEventScanResultValues())
+                        )
+                    )
+                ).flatMerge(
+                    seq -> seq,
+                    Ordering.from(new ScanResultValueTimestampComparator(
+                        (ScanQuery) queryPlus.getQuery()
+                    )).reverse()
+                )
+        )
+    ).limit(
+        Math.toIntExact(((ScanQuery) (queryPlus.getQuery())).getLimit())
 
 Review comment:
   If I do not specify a `limit` parameter but do specify an `order`, I end up 
in here with `limit` as `Long.MAX_VALUE` and hit a 
   ```
   2019-03-27T23:39:05,477 ERROR 
[qtp904351240-69[scan_[wikiticker]_e44c2050-ecd4-45b7-ba3e-28913a46581e]] 
org.apache.druid.server.QueryResource - Exception handling request: 
{class=org.apache.druid.server.QueryResource, exceptionType=class 
java.lang.ArithmeticException, exceptionMessage=integer overflow, 
exception=java.lang.ArithmeticException: integer overflow, 
query=ScanQuery{dataSource='wikiticker', 
querySegmentSpec=MultipleSpecificSegmentSpec{descriptors=[SegmentDescriptor{interval=2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z,
 version='2019-03-08T11:26:16.569Z', partitionNumber=0}]}, virtualColumns=[], 
resultFormat='list', batchSize=20480, limit=9223372036854775807, 
dimFilter=null, columns=[], legacy=false}, peer=127.0.0.1}
   java.lang.ArithmeticException: integer overflow
        at java.lang.Math.toIntExact(Math.java:1011) ~[?:1.8.0_192]
        at 
org.apache.druid.query.scan.ScanQueryRunnerFactory.nWayMergeAndLimit(ScanQueryRunnerFactory.java:306)
 ~[classes/:?]
   ...
   ```
   I haven't quite figured out what the correct behavior here in that condition 
is yet, but this error isn't so friendly

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to