599166320 opened a new issue, #12958:
URL: https://github.com/apache/druid/issues/12958

   ### Motivation
   
   We are looking forward to using Druid's scanQuery to do the order by 
operation, but so far, Druid has no corresponding version support.
   
   To use orderby, you can only use groupbyquery / topnquery to query.
   
   However, We are not satisfied with the performance of groupByQuery. Using 
topn to implement, the performance will be better, but there are limitations. 
It can only be one-dimensional.
   
   We support the orderby algorithm of scanquery:
   
   Modify the scanQueryEngine class,
   
   Step 1: extract the column to be sorted and the corresponding offset, and 
store the column to be sorted and the offset value of topn in the TreeMap. The 
column to be sorted is the key and the offset is the value
   
   Step 2: extract relevant rows according to the offset of treemap and 
generate the final topn return.
   
   ```
       TreeMap<Object, Long> sortValueAndOffset = new TreeMap();
       List<String> sortColumns;
       List<String> orderByDirection;
       int limit;
       Sequence<Cursor> cursorSequence = adapter.makeCursors(filter, 
intervals.get(0), query.getVirtualColumns(), Granularities.ALL, 
query.getOrder().equals(ScanQuery.Order.DESCENDING) || 
(query.getOrder().equals(ScanQuery.Order.NONE) && query.isDescending()), null);
       cursorSequence.toList().stream().map(cursor -> new BaseSequence<>(
               new BaseSequence.IteratorMaker<ScanResultValue, 
Iterator<ScanResultValue>>()
               {
                 @Override
                 public Iterator<ScanResultValue> make()
                 {
                   final List<BaseObjectColumnValueSelector> columnSelectors = 
new ArrayList<>(sortColumns.size());
   
                   for (String column : sortColumns) {
                     final BaseObjectColumnValueSelector selector;
   
                     if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
                       selector = cursor.getColumnSelectorFactory()
                               
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
                     } else {
                       selector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
                     }
   
                     columnSelectors.add(selector);
                   }
   
                   return new Iterator<ScanResultValue>()
                   {
                     private long offset = 0;
   
                     @Override
                     public boolean hasNext()
                     {
                       return !cursor.isDone();
                     }
   
                     @Override
                     public ScanResultValue next()
                     {
                       if (!hasNext()) {
                         throw new NoSuchElementException();
                       }
                       if (hasTimeout && System.currentTimeMillis() >= 
timeoutAt) {
                         throw new 
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", 
query.getId()));
                       }
                       final long lastOffset = offset;
                       this.rowsToCompactedList();
                       
responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, offset - lastOffset);
                       if (hasTimeout) {
                         responseContext.put(
                                 ResponseContext.Key.TIMEOUT_AT,
                                 timeoutAt - (System.currentTimeMillis() - 
start)
                         );
                       }
                       return new ScanResultValue(segmentId.toString(), 
allColumns, sortValueAndOffset);
                     }
   
                     @Override
                     public void remove()
                     {
                       throw new UnsupportedOperationException();
                     }
   
                     private void rowsToCompactedList()
                     {
                       while(!cursor.isDone()) {
                         for(int j = 0; j < sortColumns.size(); ++j) {
                           sortValueAndOffset.put(this.getColumnValue(j), 
this.offset);
                           if(sortValueAndOffset.size() > limit){
                             if 
(OrderByColumnSpec.Direction.fromString(orderByDirection.get(j)) == 
OrderByColumnSpec.Direction.DESCENDING) {
                               
sortValueAndOffset.remove(sortValueAndOffset.firstKey());
                             }else {
                               
sortValueAndOffset.remove(sortValueAndOffset.lastKey());
                             }
                           }
                         }
                         cursor.advance();
                         ++this.offset;
                       }
                     }
   
                     private Object getColumnValue(int i)
                     {
                       final BaseObjectColumnValueSelector selector = 
columnSelectors.get(i);
                       final Object value;
   
                       if (legacy && 
allColumns.get(i).equals(LEGACY_TIMESTAMP_KEY)) {
                         value = DateTimes.utc((long) selector.getObject());
                       } else {
                         value = selector == null ? null : selector.getObject();
                       }
   
                       return value;
                     }
                   };
                 }
   
                 @Override
                 public void cleanup(Iterator<ScanResultValue> iterFromMake)
                 {
                 }
               }
       )).forEach((s) -> {
         s.toList();
       });
   
       Set<Long> topKOffset = new HashSet(sortValueAndOffset.values());
   
       return Sequences.concat(
               adapter
                       .makeCursors(
                               filter,
                               intervals.get(0),
                               query.getVirtualColumns(),
                               Granularities.ALL,
                               
query.getOrder().equals(ScanQuery.Order.DESCENDING) ||
                                       
(query.getOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
                               null
                       )
                       .map(cursor -> new BaseSequence<>(
                               new BaseSequence.IteratorMaker<ScanResultValue, 
Iterator<ScanResultValue>>()
                               {
                                 @Override
                                 public Iterator<ScanResultValue> make()
                                 {
                                   final List<BaseObjectColumnValueSelector> 
columnSelectors = new ArrayList<>(allColumns.size());
   
                                   for (String column : allColumns) {
                                     final BaseObjectColumnValueSelector 
selector;
   
                                     if (legacy && 
LEGACY_TIMESTAMP_KEY.equals(column)) {
                                       selector = 
cursor.getColumnSelectorFactory()
                                               
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
                                     } else {
                                       selector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
                                     }
   
                                     columnSelectors.add(selector);
                                   }
   
                                   final int batchSize = query.getBatchSize();
                                   return new Iterator<ScanResultValue>()
                                   {
                                     private long offset = 0;
   
                                     @Override
                                     public boolean hasNext()
                                     {
                                       return !cursor.isDone() && offset < 
limit;
                                     }
   
                                     @Override
                                     public ScanResultValue next()
                                     {
                                       if (!hasNext()) {
                                         throw new NoSuchElementException();
                                       }
                                       if (hasTimeout && 
System.currentTimeMillis() >= timeoutAt) {
                                         throw new 
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", 
query.getId()));
                                       }
                                       final long lastOffset = offset;
                                       final Object events;
                                       final ScanQuery.ResultFormat 
resultFormat = query.getResultFormat();
                                       if 
(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) {
                                         events = rowsToCompactedList();
                                       } else if 
(ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
                                         events = rowsToList();
                                       } else {
                                         throw new UOE("resultFormat[%s] is not 
supported", resultFormat.toString());
                                       }
                                       
responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, offset - lastOffset);
                                       if (hasTimeout) {
                                         responseContext.put(
                                                 ResponseContext.Key.TIMEOUT_AT,
                                                 timeoutAt - 
(System.currentTimeMillis() - start)
                                         );
                                       }
                                       return new 
ScanResultValue(segmentId.toString(), allColumns, events);
                                     }
   
                                     @Override
                                     public void remove()
                                     {
                                       throw new 
UnsupportedOperationException();
                                     }
   
                                     private List<List<Object>> 
rowsToCompactedList()
                                     {
                                       final List<List<Object>> events = new 
ArrayList<>(batchSize);
   
                                       if(topKOffset.size() > 0){
                                         for (; !cursor.isDone(); 
cursor.advance(), offset++) {
                                           if 
(topKOffset.contains(this.offset)) {
                                             final List<Object> theEvent = new 
ArrayList<>(allColumns.size());
                                             for (int j = 0; j < 
allColumns.size(); j++) {
                                               theEvent.add(getColumnValue(j));
                                             }
                                             events.add(theEvent);
                                           }
                                         }
                                       }else {
                                         final long iterLimit = Math.min(limit, 
offset + batchSize);
                                         for (; !cursor.isDone() && offset < 
iterLimit; cursor.advance(), offset++) {
                                           final List<Object> theEvent = new 
ArrayList<>(allColumns.size());
                                           for (int j = 0; j < 
allColumns.size(); j++) {
                                             theEvent.add(getColumnValue(j));
                                           }
                                           events.add(theEvent);
                                         }
                                       }
                                       return events;
                                     }
   
                                     private List<Map<String, Object>> 
rowsToList()
                                     {
                                       List<Map<String, Object>> events = 
Lists.newArrayListWithCapacity(batchSize);
                                       if (topKOffset.size() > 0) {
                                         for (; !cursor.isDone(); 
cursor.advance(), offset++) {
                                           if 
(topKOffset.contains(this.offset)) {
                                             final Map<String, Object> theEvent 
= new LinkedHashMap<>();
                                             for (int j = 0; j < 
allColumns.size(); j++) {
                                               theEvent.put(allColumns.get(j), 
getColumnValue(j));
                                             }
                                             events.add(theEvent);
                                           }
                                         }
                                       }else {
                                         final long iterLimit = Math.min(limit, 
offset + batchSize);
                                         for (; !cursor.isDone() && offset < 
iterLimit; cursor.advance(), offset++) {
                                           final Map<String, Object> theEvent = 
new LinkedHashMap<>();
                                           for (int j = 0; j < 
allColumns.size(); j++) {
                                             theEvent.put(allColumns.get(j), 
getColumnValue(j));
                                           }
                                           events.add(theEvent);
                                         }
                                       }
                                       return events;
                                     }
   
                                     private Object getColumnValue(int i)
                                     {
                                       final BaseObjectColumnValueSelector 
selector = columnSelectors.get(i);
                                       final Object value;
   
                                       if (legacy && 
allColumns.get(i).equals(LEGACY_TIMESTAMP_KEY)) {
                                         value = DateTimes.utc((long) 
selector.getObject());
                                       } else {
                                         value = selector == null ? null : 
selector.getObject();
                                       }
   
                                       return value;
                                     }
                                   };
                                 }
   
                                 @Override
                                 public void cleanup(Iterator<ScanResultValue> 
iterFromMake)
                                 {
                                 }
                               }
                       ))
       );
   ```
   
   I don't know what the problem will be, but I hope someone can point out the 
confusion.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to