This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f5f072c Optimize IndexedTable (#7373)
f5f072c is described below
commit f5f072ceeb151e2694f77987f56b88334d6fe79a
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Aug 30 13:55:02 2021 -0700
Optimize IndexedTable (#7373)
- When resizing the records map, do not recreate the map but clear it to
avoid growing the map again. This can also reduce the GC required
- When calculating the final results on server side, do not recreate the
map but directly create a list to return the results
- Move the common logic to the base class `IndexedTable`
---
.../core/data/table/ConcurrentIndexedTable.java | 94 ++--------------
.../apache/pinot/core/data/table/IndexedTable.java | 56 +++++++++-
.../pinot/core/data/table/IntermediateRecord.java | 19 ++--
.../pinot/core/data/table/SimpleIndexedTable.java | 82 ++------------
.../apache/pinot/core/data/table/TableResizer.java | 123 ++++++++++-----------
.../table/UnboundedConcurrentIndexedTable.java | 8 +-
.../combine/GroupByOrderByCombineOperator.java | 7 +-
.../pinot/core/data/table/TableResizerTest.java | 36 +++---
8 files changed, 163 insertions(+), 262 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index 4e80e09..3e54e92 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -19,37 +19,23 @@
package org.apache.pinot.core.data.table;
import com.google.common.base.Preconditions;
-import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Thread safe {@link Table} implementation for aggregating Records based on
combination of keys
*/
+@SuppressWarnings("unchecked")
public class ConcurrentIndexedTable extends IndexedTable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConcurrentIndexedTable.class);
-
- protected volatile ConcurrentMap<Key, Record> _lookupMap;
- protected final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
- private Iterator<Record> _iterator;
- private final ReentrantReadWriteLock _readWriteLock;
- private final AtomicInteger _numResizes = new AtomicInteger();
- private final AtomicLong _resizeTimeMs = new AtomicLong();
+ private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
+ private final ReentrantReadWriteLock _readWriteLock = new
ReentrantReadWriteLock();
public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext
queryContext, int trimSize, int trimThreshold) {
- super(dataSchema, queryContext, trimSize, trimThreshold);
- _lookupMap = new ConcurrentHashMap<>();
- _readWriteLock = new ReentrantReadWriteLock();
+ super(dataSchema, queryContext, trimSize, trimThreshold, new
ConcurrentHashMap<>());
}
/**
@@ -58,7 +44,8 @@ public class ConcurrentIndexedTable extends IndexedTable {
@Override
public boolean upsert(Key key, Record newRecord) {
Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
- if (_noMoreNewRecords.get()) { // allow only existing record updates
+ if (_noMoreNewRecords.get()) {
+ // allow only existing record updates
_lookupMap.computeIfPresent(key, (k, v) -> {
Object[] existingValues = v.getValues();
Object[] newValues = newRecord.getValues();
@@ -68,8 +55,8 @@ public class ConcurrentIndexedTable extends IndexedTable {
}
return v;
});
- } else { // allow all records
-
+ } else {
+ // allow all records
_readWriteLock.readLock().lock();
try {
_lookupMap.compute(key, (k, v) -> {
@@ -96,7 +83,7 @@ public class ConcurrentIndexedTable extends IndexedTable {
_readWriteLock.writeLock().lock();
try {
if (_lookupMap.size() >= _trimThreshold) {
- resize(_trimSize);
+ resize();
}
} finally {
_readWriteLock.writeLock().unlock();
@@ -109,67 +96,4 @@ public class ConcurrentIndexedTable extends IndexedTable {
}
return true;
}
-
- @Override
- public int size() {
- return _sortedRecords == null ? _lookupMap.size() : _sortedRecords.size();
- }
-
- @Override
- public Iterator<Record> iterator() {
- return _iterator;
- }
-
- private void resize(int trimToSize) {
- long startTime = System.currentTimeMillis();
- // when the resizer trims using a PQ, it will return a new trimmed map.
- // the reference held by the indexed table needs to be updated. this is
also
- // the reason why it is volatile since the thread doing the resize will
result in
- // a new reference
- _lookupMap = (ConcurrentMap) _tableResizer.resizeRecordsMap(_lookupMap,
trimToSize);
- long endTime = System.currentTimeMillis();
- long timeElapsed = endTime - startTime;
- _numResizes.incrementAndGet();
- _resizeTimeMs.addAndGet(timeElapsed);
- }
-
- private List<Record> resizeAndSort(int trimToSize) {
- long startTime = System.currentTimeMillis();
- List<Record> sortedRecords = _tableResizer.sortRecordsMap(_lookupMap,
trimToSize);
- long endTime = System.currentTimeMillis();
- long timeElapsed = endTime - startTime;
- _numResizes.incrementAndGet();
- _resizeTimeMs.addAndGet(timeElapsed);
- return sortedRecords;
- }
-
- @Override
- public void finish(boolean sort) {
- if (_hasOrderBy) {
- if (sort) {
- _sortedRecords = resizeAndSort(_trimSize);
- _iterator = _sortedRecords.iterator();
- } else {
- resize(_trimSize);
- }
- int numResizes = _numResizes.get();
- long resizeTime = _resizeTimeMs.get();
- LOGGER.debug(
- "Num resizes : {}, Total time spent in resizing : {}, Avg resize
time : {}, trimSize: {}, trimThreshold: {}",
- numResizes, resizeTime, numResizes == 0 ? 0 : resizeTime /
numResizes, _trimSize, _trimThreshold);
- }
- if (_iterator == null) {
- _iterator = _lookupMap.values().iterator();
- }
- }
-
- @Override
- public int getNumResizes() {
- return _numResizes.get();
- }
-
- @Override
- public long getResizeTimeMs() {
- return _resizeTimeMs.get();
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index c145706..2ac82e7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -19,7 +19,11 @@
package org.apache.pinot.core.data.table;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
@@ -32,18 +36,24 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
*/
@SuppressWarnings("rawtypes")
public abstract class IndexedTable extends BaseTable {
+ protected final Map<Key, Record> _lookupMap;
protected final int _numKeyColumns;
protected final AggregationFunction[] _aggregationFunctions;
protected final boolean _hasOrderBy;
protected final TableResizer _tableResizer;
- protected List<Record> _sortedRecords;
// The size we need to trim to
protected final int _trimSize;
// The size with added buffer, in order to collect more records than
capacity for better precision
protected final int _trimThreshold;
- protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int
trimSize, int trimThreshold) {
+ protected Collection<Record> _topRecords;
+ private int _numResizes;
+ private long _resizeTimeNs;
+
+ protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int
trimSize, int trimThreshold,
+ Map<Key, Record> lookupMap) {
super(dataSchema);
+ _lookupMap = lookupMap;
List<ExpressionContext> groupByExpressions =
queryContext.getGroupByExpressions();
assert groupByExpressions != null;
_numKeyColumns = groupByExpressions.size();
@@ -84,7 +94,45 @@ public abstract class IndexedTable extends BaseTable {
return upsert(new Key(keyValues), record);
}
- public abstract int getNumResizes();
+ @Override
+ public int size() {
+ return _topRecords != null ? _topRecords.size() : _lookupMap.size();
+ }
+
+ /**
+ * Resizes the lookup map based on the trim size.
+ */
+ protected void resize() {
+ long startTimeNs = System.nanoTime();
+ _tableResizer.resizeRecordsMap(_lookupMap, _trimSize);
+ long resizeTimeNs = System.nanoTime() - startTimeNs;
+ _numResizes++;
+ _resizeTimeNs += resizeTimeNs;
+ }
+
+ @Override
+ public void finish(boolean sort) {
+ if (_hasOrderBy) {
+ long startTimeNs = System.nanoTime();
+ _topRecords = _tableResizer.getTopRecords(_lookupMap, _trimSize, sort);
+ long resizeTimeNs = System.nanoTime() - startTimeNs;
+ _numResizes++;
+ _resizeTimeNs += resizeTimeNs;
+ } else {
+ _topRecords = _lookupMap.values();
+ }
+ }
+
+ @Override
+ public Iterator<Record> iterator() {
+ return _topRecords.iterator();
+ }
+
+ public int getNumResizes() {
+ return _numResizes;
+ }
- public abstract long getResizeTimeMs();
+ public long getResizeTimeMs() {
+ return TimeUnit.NANOSECONDS.toMillis(_resizeTimeNs);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
index 520c3e8..85288d4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
@@ -19,24 +19,19 @@
package org.apache.pinot.core.data.table;
/**
- * Helper class to store a subset of Record fields
- * IntermediateRecord is derived from a Record
- * Some of the main properties of an IntermediateRecord are:
- *
- * 1. Key in IntermediateRecord is expected to be identical to the one in the
Record
- * 2. For values, IntermediateRecord should only have the columns needed for
order by
- * 3. Inside the values, the columns should be ordered by the order by sequence
- * 4. For order by on aggregations, final results are extracted
- * 5. There is a mandatory field to store the original record to prevent from
duplicate looking up
+ * Helper class to store the values to be ordered. It also wraps the Key and
Record of the record.
+ * - When ordering on an aggregation, stores the final result of the
aggregation
+ * - When ordering on a column/transform, stores the actual value of the
expression
*/
+@SuppressWarnings("rawtypes")
public class IntermediateRecord {
public final Key _key;
- public final Comparable[] _values;
public final Record _record;
+ public final Comparable[] _values;
- IntermediateRecord(Key key, Comparable[] values, Record record) {
+ IntermediateRecord(Key key, Record record, Comparable[] values) {
_key = key;
- _values = values;
_record = record;
+ _values = values;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index 41c63c8..d9ec976 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -20,33 +20,21 @@ package org.apache.pinot.core.data.table;
import com.google.common.base.Preconditions;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@link Table} implementation for aggregating TableRecords based on
combination of keys
*/
+@SuppressWarnings("unchecked")
@NotThreadSafe
public class SimpleIndexedTable extends IndexedTable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleIndexedTable.class);
-
- private Map<Key, Record> _lookupMap;
- private Iterator<Record> _iterator;
-
private boolean _noMoreNewRecords = false;
- private int _numResizes = 0;
- private long _resizeTimeMs = 0;
public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext,
int trimSize, int trimThreshold) {
- super(dataSchema, queryContext, trimSize, trimThreshold);
- _lookupMap = new HashMap<>();
+ super(dataSchema, queryContext, trimSize, trimThreshold, new HashMap<>());
}
/**
@@ -55,7 +43,8 @@ public class SimpleIndexedTable extends IndexedTable {
@Override
public boolean upsert(Key key, Record newRecord) {
Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
- if (_noMoreNewRecords) { // allow only existing record updates
+ if (_noMoreNewRecords) {
+ // allow only existing record updates
_lookupMap.computeIfPresent(key, (k, v) -> {
Object[] existingValues = v.getValues();
Object[] newValues = newRecord.getValues();
@@ -65,8 +54,8 @@ public class SimpleIndexedTable extends IndexedTable {
}
return v;
});
- } else { // allow all records
-
+ } else {
+ // allow all records
_lookupMap.compute(key, (k, v) -> {
if (v == null) {
return newRecord;
@@ -84,7 +73,7 @@ public class SimpleIndexedTable extends IndexedTable {
if (_lookupMap.size() >= _trimThreshold) {
if (_hasOrderBy) {
// reached max capacity, resize
- resize(_trimSize);
+ resize();
} else {
// reached max capacity and no order by. No more new records will be
accepted
_noMoreNewRecords = true;
@@ -93,61 +82,4 @@ public class SimpleIndexedTable extends IndexedTable {
}
return true;
}
-
- private void resize(int trimToSize) {
- long startTime = System.currentTimeMillis();
- _lookupMap = _tableResizer.resizeRecordsMap(_lookupMap, trimToSize);
- long endTime = System.currentTimeMillis();
- long timeElapsed = endTime - startTime;
- _numResizes++;
- _resizeTimeMs += timeElapsed;
- }
-
- private List<Record> resizeAndSort(int trimToSize) {
- long startTime = System.currentTimeMillis();
- List<Record> sortedRecords = _tableResizer.sortRecordsMap(_lookupMap,
trimToSize);
- long endTime = System.currentTimeMillis();
- long timeElapsed = endTime - startTime;
- _numResizes++;
- _resizeTimeMs += timeElapsed;
- return sortedRecords;
- }
-
- @Override
- public int size() {
- return _sortedRecords == null ? _lookupMap.size() : _sortedRecords.size();
- }
-
- @Override
- public Iterator<Record> iterator() {
- return _iterator;
- }
-
- @Override
- public void finish(boolean sort) {
- if (_hasOrderBy) {
- if (sort) {
- _sortedRecords = resizeAndSort(_trimSize);
- _iterator = _sortedRecords.iterator();
- } else {
- resize(_trimSize);
- }
- LOGGER.debug(
- "Num resizes : {}, Total time spent in resizing : {}, Avg resize
time : {}, trimSize: {}, trimThreshold: {}",
- _numResizes, _resizeTimeMs, _numResizes == 0 ? 0 : _resizeTimeMs /
_numResizes, _trimSize, _trimThreshold);
- }
- if (_iterator == null) {
- _iterator = _lookupMap.values().iterator();
- }
- }
-
- @Override
- public int getNumResizes() {
- return _numResizes;
- }
-
- @Override
- public long getResizeTimeMs() {
- return _resizeTimeMs;
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
index 3e98f6a..d1cd71d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.core.data.table;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -27,8 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
@@ -109,8 +109,8 @@ public class TableResizer {
return new GroupByExpressionExtractor(groupByExpressionIndex);
}
FunctionContext function = expression.getFunction();
- Preconditions
- .checkState(function != null, "Failed to find ORDER-BY expression: %s
in the GROUP-BY clause", expression);
+ Preconditions.checkState(function != null, "Failed to find ORDER-BY
expression: %s in the GROUP-BY clause",
+ expression);
if (function.getType() == FunctionContext.Type.AGGREGATION) {
// Aggregation function
return new
AggregationFunctionExtractor(_aggregationFunctionIndexMap.get(function));
@@ -121,62 +121,40 @@ public class TableResizer {
}
/**
- * Constructs an IntermediateRecord from Record
- * The IntermediateRecord::key is the same Record::key
- * The IntermediateRecord::values contains only the order by columns, in the
query's sort sequence
- * For aggregation values in the order by, the final result is extracted if
the intermediate result is non-comparable
+ * Constructs an IntermediateRecord by extracting the order-by values from
the record.
*/
private IntermediateRecord getIntermediateRecord(Key key, Record record) {
- Comparable[] intermediateRecordValues = new
Comparable[_numOrderByExpressions];
+ Comparable[] orderByValues = new Comparable[_numOrderByExpressions];
for (int i = 0; i < _numOrderByExpressions; i++) {
- intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record);
+ orderByValues[i] = _orderByValueExtractors[i].extract(record);
}
- return new IntermediateRecord(key, intermediateRecordValues, record);
+ return new IntermediateRecord(key, record, orderByValues);
}
/**
- * Trim recordsMap to trimToSize, based on order by information
- * Resize only if number of records is greater than trimToSize
- * The resizer smartly chooses to create PQ of records to evict or records
to retain, based on the number of
- * records and the number of records to evict
+ * Resizes the recordsMap to the given size.
*/
- public Map<Key, Record> resizeRecordsMap(Map<Key, Record> recordsMap, int
trimToSize) {
- int numRecordsToEvict = recordsMap.size() - trimToSize;
- if (numRecordsToEvict > 0) {
- // TODO: compare the performance of converting to IntermediateRecord vs
keeping Record, in cases where we do
- // not need to extract final results
- if (numRecordsToEvict < trimToSize) {
- // num records to evict is smaller than num records to retain
- // make PQ of records to evict
- PriorityQueue<IntermediateRecord> priorityQueue =
- convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict,
_intermediateRecordComparator);
- for (IntermediateRecord evictRecord : priorityQueue) {
- recordsMap.remove(evictRecord._key);
- }
- return recordsMap;
- } else {
- // num records to retain is smaller than num records to evict
- // make PQ of records to retain
- // TODO - Consider reusing the same map by removing record from the map
- // at the time it is evicted from PQ
- Map<Key, Record> trimmedRecordsMap;
- if (recordsMap instanceof ConcurrentMap) {
- // invoked by ConcurrentIndexedTable
- trimmedRecordsMap = new ConcurrentHashMap<>();
- } else {
- // invoked by SimpleIndexedTable
- trimmedRecordsMap = new HashMap<>();
- }
- Comparator<IntermediateRecord> comparator =
_intermediateRecordComparator.reversed();
- PriorityQueue<IntermediateRecord> priorityQueue =
- convertToIntermediateRecordsPQ(recordsMap, trimToSize, comparator);
- for (IntermediateRecord recordToRetain : priorityQueue) {
- trimmedRecordsMap.put(recordToRetain._key, recordToRetain._record);
- }
- return trimmedRecordsMap;
+ public void resizeRecordsMap(Map<Key, Record> recordsMap, int size) {
+ int numRecordsToEvict = recordsMap.size() - size;
+ if (numRecordsToEvict <= 0) {
+ return;
+ }
+ if (numRecordsToEvict <= size) {
+ // Fewer records to evict than retain, make PQ of records to evict
+ PriorityQueue<IntermediateRecord> priorityQueue =
+ convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict,
_intermediateRecordComparator);
+ for (IntermediateRecord recordToEvict : priorityQueue) {
+ recordsMap.remove(recordToEvict._key);
+ }
+ } else {
+ // Fewer records to retain than evict, make PQ of records to retain
+ PriorityQueue<IntermediateRecord> priorityQueue =
+ convertToIntermediateRecordsPQ(recordsMap, size,
_intermediateRecordComparator.reversed());
+ recordsMap.clear();
+ for (IntermediateRecord recordToRetain : priorityQueue) {
+ recordsMap.put(recordToRetain._key, recordToRetain._record);
}
}
- return recordsMap;
}
private PriorityQueue<IntermediateRecord>
convertToIntermediateRecordsPQ(Map<Key, Record> recordsMap, int size,
@@ -198,25 +176,44 @@ public class TableResizer {
}
/**
- * Sorts the recordsMap using a priority queue and returns a sorted list of
records
- * This method is to be called from IndexedTable::finish, if both resize and
sort is needed
+ * Returns the top records from the recordsMap.
*/
- public List<Record> sortRecordsMap(Map<Key, Record> recordsMap, int
trimToSize) {
+ public Collection<Record> getTopRecords(Map<Key, Record> recordsMap, int
size, boolean sort) {
+ return sort ? getSortedTopRecords(recordsMap, size) :
getUnsortedTopRecords(recordsMap, size);
+ }
+
+ @VisibleForTesting
+ List<Record> getSortedTopRecords(Map<Key, Record> recordsMap, int size) {
int numRecords = recordsMap.size();
if (numRecords == 0) {
return Collections.emptyList();
}
- int numRecordsToRetain = Math.min(numRecords, trimToSize);
- // make PQ of sorted records to retain
- PriorityQueue<IntermediateRecord> priorityQueue =
- convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain,
_intermediateRecordComparator.reversed());
- Record[] sortedArray = new Record[numRecordsToRetain];
- while (!priorityQueue.isEmpty()) {
- IntermediateRecord intermediateRecord = priorityQueue.poll();
- sortedArray[--numRecordsToRetain] = intermediateRecord._record;
- ;
+ size = Math.min(numRecords, size);
+ PriorityQueue<IntermediateRecord> topRecords =
+ convertToIntermediateRecordsPQ(recordsMap, size,
_intermediateRecordComparator.reversed());
+ Record[] sortedTopRecords = new Record[size];
+ while (size > 0) {
+ IntermediateRecord intermediateRecord = topRecords.poll();
+ assert intermediateRecord != null;
+ sortedTopRecords[--size] = intermediateRecord._record;
+ }
+ return Arrays.asList(sortedTopRecords);
+ }
+
+ private Collection<Record> getUnsortedTopRecords(Map<Key, Record>
recordsMap, int size) {
+ int numRecords = recordsMap.size();
+ if (numRecords <= size) {
+ return recordsMap.values();
+ } else {
+ PriorityQueue<IntermediateRecord> topRecords =
+ convertToIntermediateRecordsPQ(recordsMap, size,
_intermediateRecordComparator.reversed());
+ Record[] unsortedTopRecords = new Record[size];
+ int index = 0;
+ for (IntermediateRecord topRecord : topRecords) {
+ unsortedTopRecords[index++] = topRecord._record;
+ }
+ return Arrays.asList(unsortedTopRecords);
}
- return Arrays.asList(sortedArray);
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
index aa1bcb0..62accae 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.data.table;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -33,11 +35,13 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
* noticed that load-unlock overhead was > 1sec and this specialized concurrent
* indexed table avoids that by overriding just the upsert method
*/
-public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {
+@SuppressWarnings("unchecked")
+public class UnboundedConcurrentIndexedTable extends IndexedTable {
+ private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
public UnboundedConcurrentIndexedTable(DataSchema dataSchema, QueryContext
queryContext, int trimSize,
int trimThreshold) {
- super(dataSchema, queryContext, trimSize, trimThreshold);
+ super(dataSchema, queryContext, trimSize, trimThreshold, new
ConcurrentHashMap<>());
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 8826b58..579b8ce 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
@@ -73,7 +74,7 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
// _futures (try to interrupt the execution if it already started).
private final CountDownLatch _operatorLatch;
private DataSchema _dataSchema;
- private ConcurrentIndexedTable _indexedTable;
+ private IndexedTable _indexedTable;
public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext
queryContext,
ExecutorService executorService, long endTimeMs, int minTrimSize, int
trimThreshold) {
@@ -210,8 +211,8 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
boolean opCompleted = _operatorLatch.await(timeoutMs,
TimeUnit.MILLISECONDS);
if (!opCompleted) {
// If this happens, the broker side should already timed out, just log
the error and return
- String errorMessage = String
- .format("Timed out while combining group-by order-by results after
%dms, queryContext = %s", timeoutMs,
+ String errorMessage =
+ String.format("Timed out while combining group-by order-by results
after %dms, queryContext = %s", timeoutMs,
_queryContext);
LOGGER.error(errorMessage);
return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
index 91fcb81..7bad89f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
@@ -77,8 +77,8 @@ public class TableResizerTest {
_keys = Arrays.asList(new Key(new Object[]{"a", 10, 1.0}), new Key(new
Object[]{"b", 10, 2.0}),
new Key(new Object[]{"c", 200, 3.0}), new Key(new Object[]{"c", 50,
4.0}),
new Key(new Object[]{"c", 300, 5.0}));
- List<Object[]> objectArray = Arrays
- .asList(new Object[]{"a", 10, 1.0}, new Object[]{"b", 10, 2.0}, new
Object[]{"c", 200, 3.0},
+ List<Object[]> objectArray =
+ Arrays.asList(new Object[]{"a", 10, 1.0}, new Object[]{"b", 10, 2.0},
new Object[]{"c", 200, 3.0},
new Object[]{"c", 50, 4.0}, new Object[]{"c", 300, 5.0});
// Use _keys for _groupKeys
@@ -212,7 +212,7 @@ public class TableResizerTest {
tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1"));
recordsMap = new HashMap<>(_recordsMap);
- recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ tableResizer.resizeRecordsMap(recordsMap, trimToSize);
assertEquals(recordsMap.size(), trimToSize);
assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b
assertTrue(recordsMap.containsKey(_keys.get(1)));
@@ -221,7 +221,7 @@ public class TableResizerTest {
tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)"));
recordsMap = new HashMap<>(_recordsMap);
- recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ tableResizer.resizeRecordsMap(recordsMap, trimToSize);
assertEquals(recordsMap.size(), trimToSize);
assertTrue(recordsMap.containsKey(_keys.get(4))); // 2, 3
assertTrue(recordsMap.containsKey(_keys.get(3)));
@@ -230,7 +230,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX +
"DISTINCTCOUNT(m3) DESC, d1"));
recordsMap = new HashMap<>(_recordsMap);
- recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ tableResizer.resizeRecordsMap(recordsMap, trimToSize);
assertEquals(recordsMap.size(), trimToSize);
assertTrue(recordsMap.containsKey(_keys.get(4))); // 4, 3
assertTrue(recordsMap.containsKey(_keys.get(3)));
@@ -239,7 +239,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 /
(DISTINCTCOUNT(m3) + 1), d1 DESC"));
recordsMap = new HashMap<>(_recordsMap);
- recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ tableResizer.resizeRecordsMap(recordsMap, trimToSize);
assertEquals(recordsMap.size(), trimToSize);
assertTrue(recordsMap.containsKey(_keys.get(1))); // 3.33, 12.5
assertTrue(recordsMap.containsKey(_keys.get(0)));
@@ -249,26 +249,26 @@ public class TableResizerTest {
* Tests the sort function for ordered resizer
*/
@Test
- public void testResizeAndSortRecordsMap() {
+ public void testSortTopRecords() {
// d1 asc
TableResizer tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1"));
Map<Key, Record> recordsMap = new HashMap<>(_recordsMap);
- List<Record> sortedRecords = tableResizer.sortRecordsMap(recordsMap,
TRIM_TO_SIZE);
+ List<Record> sortedRecords = tableResizer.getSortedTopRecords(recordsMap,
TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(0)); // a, b
assertEquals(sortedRecords.get(1), _records.get(1));
// d1 asc - trim to 1
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
+ sortedRecords = tableResizer.getSortedTopRecords(recordsMap, 1);
assertEquals(sortedRecords.get(0), _records.get(0)); // a
// d1 asc, d3 desc (tie breaking with 2nd comparator)
tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, d3
DESC"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+ sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(0)); // a, b, c (300)
assertEquals(sortedRecords.get(1), _records.get(1));
@@ -276,7 +276,7 @@ public class TableResizerTest {
// d1 asc, d3 desc (tie breaking with 2nd comparator) - trim to 1
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
+ sortedRecords = tableResizer.getSortedTopRecords(recordsMap, 1);
assertEquals(sortedRecords.size(), 1);
assertEquals(sortedRecords.get(0), _records.get(0)); // a
@@ -284,7 +284,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1,
SUM(m1) DESC, max(m2) DESC"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+ sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(0)); // a, b, c (30, 300)
assertEquals(sortedRecords.get(1), _records.get(1));
@@ -292,7 +292,7 @@ public class TableResizerTest {
// d1 asc, sum(m1) desc, max(m2) desc - trim to 1
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
+ sortedRecords = tableResizer.getSortedTopRecords(recordsMap, 1);
assertEquals(sortedRecords.size(), 1);
assertEquals(sortedRecords.get(0), _records.get(0)); // a
@@ -300,7 +300,7 @@ public class TableResizerTest {
tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+ sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(4)); // 2, 3, 3.33
assertEquals(sortedRecords.get(1), _records.get(3));
@@ -310,7 +310,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX +
"DISTINCTCOUNT(m3) DESC, d1"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+ sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(4)); // 4, 3, 2 (b)
assertEquals(sortedRecords.get(1), _records.get(3));
@@ -320,7 +320,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 /
(DISTINCTCOUNT(m3) + 1), d1 DESC"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+ sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(1)); // 3.33, 12.5, 5
assertEquals(sortedRecords.get(1), _records.get(0));
@@ -347,8 +347,8 @@ public class TableResizerTest {
assertEquals(resultArray[1]._record, _records.get(3));
assertEquals(resultArray[2]._record, _records.get(4));
- tableResizer = new TableResizer(DATA_SCHEMA, QueryContextConverterUtils
- .getQueryContextFromSQL(QUERY_PREFIX + "SUM(m1) DESC, max(m2) DESC,
DISTINCTCOUNT(m3) DESC"));
+ tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(
+ QUERY_PREFIX + "SUM(m1) DESC, max(m2) DESC, DISTINCTCOUNT(m3) DESC"));
results = tableResizer.trimInSegmentResults(_groupKeys.listIterator(),
_groupByResultHolders, TRIM_TO_SIZE);
assertEquals(results.size(), TRIM_TO_SIZE);
for (int i = 0; i < TRIM_TO_SIZE; ++i) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]