599166320 opened a new issue, #12958:
URL: https://github.com/apache/druid/issues/12958
### Motivation
We are looking forward to using Druid's scanQuery to do the order by
operation, but so far, Druid has no corresponding version support.
To use orderby, you can only use groupbyquery / topnquery to query.
However, We are not satisfied with the performance of groupByQuery. Using
topn to implement, the performance will be better, but there are limitations.
It can only be one-dimensional.
We support the orderby algorithm of scanquery:
Modify the scanQueryEngine class,
Step 1: extract the column to be sorted and the corresponding offset, and
store the column to be sorted and the offset value of topn in the TreeMap. The
column to be sorted is the key and the offset is the value
Step 2: extract relevant rows according to the offset of treemap and
generate the final topn return.
```
TreeMap<Object, Long> sortValueAndOffset = new TreeMap();
List<String> sortColumns;
List<String> orderByDirection;
int limit;
Sequence<Cursor> cursorSequence = adapter.makeCursors(filter,
intervals.get(0), query.getVirtualColumns(), Granularities.ALL,
query.getOrder().equals(ScanQuery.Order.DESCENDING) ||
(query.getOrder().equals(ScanQuery.Order.NONE) && query.isDescending()), null);
cursorSequence.toList().stream().map(cursor -> new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
{
@Override
public Iterator<ScanResultValue> make()
{
final List<BaseObjectColumnValueSelector> columnSelectors =
new ArrayList<>(sortColumns.size());
for (String column : sortColumns) {
final BaseObjectColumnValueSelector selector;
if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
selector = cursor.getColumnSelectorFactory()
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
} else {
selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
}
columnSelectors.add(selector);
}
return new Iterator<ScanResultValue>()
{
private long offset = 0;
@Override
public boolean hasNext()
{
return !cursor.isDone();
}
@Override
public ScanResultValue next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
if (hasTimeout && System.currentTimeMillis() >=
timeoutAt) {
throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
}
final long lastOffset = offset;
this.rowsToCompactedList();
responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, offset - lastOffset);
if (hasTimeout) {
responseContext.put(
ResponseContext.Key.TIMEOUT_AT,
timeoutAt - (System.currentTimeMillis() -
start)
);
}
return new ScanResultValue(segmentId.toString(),
allColumns, sortValueAndOffset);
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
private void rowsToCompactedList()
{
while(!cursor.isDone()) {
for(int j = 0; j < sortColumns.size(); ++j) {
sortValueAndOffset.put(this.getColumnValue(j),
this.offset);
if(sortValueAndOffset.size() > limit){
if
(OrderByColumnSpec.Direction.fromString(orderByDirection.get(j)) ==
OrderByColumnSpec.Direction.DESCENDING) {
sortValueAndOffset.remove(sortValueAndOffset.firstKey());
}else {
sortValueAndOffset.remove(sortValueAndOffset.lastKey());
}
}
}
cursor.advance();
++this.offset;
}
}
private Object getColumnValue(int i)
{
final BaseObjectColumnValueSelector selector =
columnSelectors.get(i);
final Object value;
if (legacy &&
allColumns.get(i).equals(LEGACY_TIMESTAMP_KEY)) {
value = DateTimes.utc((long) selector.getObject());
} else {
value = selector == null ? null : selector.getObject();
}
return value;
}
};
}
@Override
public void cleanup(Iterator<ScanResultValue> iterFromMake)
{
}
}
)).forEach((s) -> {
s.toList();
});
Set<Long> topKOffset = new HashSet(sortValueAndOffset.values());
return Sequences.concat(
adapter
.makeCursors(
filter,
intervals.get(0),
query.getVirtualColumns(),
Granularities.ALL,
query.getOrder().equals(ScanQuery.Order.DESCENDING) ||
(query.getOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
null
)
.map(cursor -> new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
{
@Override
public Iterator<ScanResultValue> make()
{
final List<BaseObjectColumnValueSelector>
columnSelectors = new ArrayList<>(allColumns.size());
for (String column : allColumns) {
final BaseObjectColumnValueSelector
selector;
if (legacy &&
LEGACY_TIMESTAMP_KEY.equals(column)) {
selector =
cursor.getColumnSelectorFactory()
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
} else {
selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
}
columnSelectors.add(selector);
}
final int batchSize = query.getBatchSize();
return new Iterator<ScanResultValue>()
{
private long offset = 0;
@Override
public boolean hasNext()
{
return !cursor.isDone() && offset <
limit;
}
@Override
public ScanResultValue next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
if (hasTimeout &&
System.currentTimeMillis() >= timeoutAt) {
throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
}
final long lastOffset = offset;
final Object events;
final ScanQuery.ResultFormat
resultFormat = query.getResultFormat();
if
(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) {
events = rowsToCompactedList();
} else if
(ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
events = rowsToList();
} else {
throw new UOE("resultFormat[%s] is not
supported", resultFormat.toString());
}
responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, offset - lastOffset);
if (hasTimeout) {
responseContext.put(
ResponseContext.Key.TIMEOUT_AT,
timeoutAt -
(System.currentTimeMillis() - start)
);
}
return new
ScanResultValue(segmentId.toString(), allColumns, events);
}
@Override
public void remove()
{
throw new
UnsupportedOperationException();
}
private List<List<Object>>
rowsToCompactedList()
{
final List<List<Object>> events = new
ArrayList<>(batchSize);
if(topKOffset.size() > 0){
for (; !cursor.isDone();
cursor.advance(), offset++) {
if
(topKOffset.contains(this.offset)) {
final List<Object> theEvent = new
ArrayList<>(allColumns.size());
for (int j = 0; j <
allColumns.size(); j++) {
theEvent.add(getColumnValue(j));
}
events.add(theEvent);
}
}
}else {
final long iterLimit = Math.min(limit,
offset + batchSize);
for (; !cursor.isDone() && offset <
iterLimit; cursor.advance(), offset++) {
final List<Object> theEvent = new
ArrayList<>(allColumns.size());
for (int j = 0; j <
allColumns.size(); j++) {
theEvent.add(getColumnValue(j));
}
events.add(theEvent);
}
}
return events;
}
private List<Map<String, Object>>
rowsToList()
{
List<Map<String, Object>> events =
Lists.newArrayListWithCapacity(batchSize);
if (topKOffset.size() > 0) {
for (; !cursor.isDone();
cursor.advance(), offset++) {
if
(topKOffset.contains(this.offset)) {
final Map<String, Object> theEvent
= new LinkedHashMap<>();
for (int j = 0; j <
allColumns.size(); j++) {
theEvent.put(allColumns.get(j),
getColumnValue(j));
}
events.add(theEvent);
}
}
}else {
final long iterLimit = Math.min(limit,
offset + batchSize);
for (; !cursor.isDone() && offset <
iterLimit; cursor.advance(), offset++) {
final Map<String, Object> theEvent =
new LinkedHashMap<>();
for (int j = 0; j <
allColumns.size(); j++) {
theEvent.put(allColumns.get(j),
getColumnValue(j));
}
events.add(theEvent);
}
}
return events;
}
private Object getColumnValue(int i)
{
final BaseObjectColumnValueSelector
selector = columnSelectors.get(i);
final Object value;
if (legacy &&
allColumns.get(i).equals(LEGACY_TIMESTAMP_KEY)) {
value = DateTimes.utc((long)
selector.getObject());
} else {
value = selector == null ? null :
selector.getObject();
}
return value;
}
};
}
@Override
public void cleanup(Iterator<ScanResultValue>
iterFromMake)
{
}
}
))
);
```
I don't know what the problem will be, but I hope someone can point out the
confusion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]