This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fa7b0e4 Perf optimization for SQL GROUP BY ORDER BY (#6225)
fa7b0e4 is described below
commit fa7b0e4ea05a02855b87e70b50bfd3f76e6afe02
Author: Sidd <[email protected]>
AuthorDate: Thu Nov 19 02:31:53 2020 -0800
Perf optimization for SQL GROUP BY ORDER BY (#6225)
* SQL group by order by perf optimization
* cleanup
Co-authored-by: Siddharth Teotia <[email protected]>
---
.../apache/pinot/common/metrics/BrokerMeter.java | 3 +
.../apache/pinot/common/metrics/ServerMeter.java | 2 +
.../apache/pinot/common/utils/CommonConstants.java | 4 +
.../org/apache/pinot/common/utils/DataTable.java | 2 +
.../core/data/table/ConcurrentIndexedTable.java | 74 +++++++++---------
.../apache/pinot/core/data/table/IndexedTable.java | 49 +++++++-----
.../pinot/core/data/table/SimpleIndexedTable.java | 57 +++++++-------
.../apache/pinot/core/data/table/TableResizer.java | 90 ++++++++--------------
.../table/UnboundedConcurrentIndexedTable.java | 80 +++++++++++++++++++
.../operator/blocks/IntermediateResultsBlock.java | 20 ++++-
.../combine/GroupByOrderByCombineOperator.java | 27 +++++--
.../apache/pinot/core/plan/CombinePlanNode.java | 10 ++-
.../core/plan/maker/InstancePlanMakerImplV2.java | 16 +++-
.../core/query/reduce/BrokerReduceService.java | 8 +-
.../core/query/reduce/DataTableReducerContext.java | 11 ++-
.../core/query/reduce/GroupByDataTableReducer.java | 37 +++++++--
.../pinot/core/query/scheduler/QueryScheduler.java | 10 +++
.../pinot/core/data/table/IndexedTableTest.java | 25 +++---
.../pinot/core/data/table/TableResizerTest.java | 26 +++----
.../operator/combine/CombineSlowOperatorsTest.java | 2 +-
.../combine/SelectionCombineOperatorTest.java | 3 +-
.../pinot/core/plan/CombinePlanNodeTest.java | 9 ++-
.../apache/pinot/perf/BenchmarkCombineGroupBy.java | 6 +-
.../apache/pinot/perf/BenchmarkIndexedTable.java | 7 +-
24 files changed, 370 insertions(+), 208 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index f295435..ac4129f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -64,6 +64,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
ENTRIES_SCANNED_IN_FILTER("documents", false),
ENTRIES_SCANNED_POST_FILTER("documents", false),
+ NUM_RESIZES("resizes", false),
+ RESIZE_TIME_MS("resizeTimeMs", false),
+
REQUEST_CONNECTION_TIMEOUTS("timeouts", false),
HELIX_ZOOKEEPER_RECONNECTS("reconnects", true),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index c680661..39cc24e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -64,6 +64,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
REFRESH_FAILURES("segments", false),
UNTAR_FAILURES("segments", false),
SEGMENT_DOWNLOAD_FAILURES("segments", false),
+ NUM_RESIZES("numResizes", false),
+ RESIZE_TIME_MS("resizeTimeMs", false),
// Netty connection metrics
NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index afb1852..9773e7e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -173,6 +173,10 @@ public class CommonConstants {
public static final int DEFAULT_MAX_REDUCE_THREADS_PER_QUERY =
Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() /
2)); // Same logic as CombineOperatorUtils
+ // used for SQL GROUP BY during broker reduce
+ public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD =
"pinot.broker.groupby.trim.threshold";
+ public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+
public static class Request {
public static final String PQL = "pql";
public static final String SQL = "sql";
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 527669c..181dc5a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -42,6 +42,8 @@ public interface DataTable {
String TIME_USED_MS_METADATA_KEY = "timeUsedMs";
String TRACE_INFO_METADATA_KEY = "traceInfo";
String REQUEST_ID_METADATA_KEY = "requestId";
+ String NUM_RESIZES_METADATA_KEY = "numResizes";
+ String RESIZE_TIME_MS_METADATA_KEY = "resizeTimeMs";
void addException(ProcessingException processingException);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index 2ff4ccd..1bfe913 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -39,17 +39,16 @@ import org.slf4j.LoggerFactory;
public class ConcurrentIndexedTable extends IndexedTable {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConcurrentIndexedTable.class);
- private final ConcurrentMap<Key, Record> _lookupMap;
- private final ReentrantReadWriteLock _readWriteLock;
+ protected volatile ConcurrentMap<Key, Record> _lookupMap;
+ protected final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
private Iterator<Record> _iterator;
-
- private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
+ private final ReentrantReadWriteLock _readWriteLock;
private final AtomicInteger _numResizes = new AtomicInteger();
- private final AtomicLong _resizeTime = new AtomicLong();
-
- public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext
queryContext, int capacity) {
- super(dataSchema, queryContext, capacity);
+ private final AtomicLong _resizeTimeMs = new AtomicLong();
+ public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext
queryContext, int trimSize,
+ int trimThreshold) {
+ super(dataSchema, queryContext, trimSize, trimThreshold);
_lookupMap = new ConcurrentHashMap<>();
_readWriteLock = new ReentrantReadWriteLock();
}
@@ -59,9 +58,7 @@ public class ConcurrentIndexedTable extends IndexedTable {
*/
@Override
public boolean upsert(Key key, Record newRecord) {
-
Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
-
if (_noMoreNewRecords.get()) { // allow only existing record updates
_lookupMap.computeIfPresent(key, (k, v) -> {
Object[] existingValues = v.getValues();
@@ -93,14 +90,14 @@ public class ConcurrentIndexedTable extends IndexedTable {
_readWriteLock.readLock().unlock();
}
- // resize if exceeds max capacity
- if (_lookupMap.size() >= _maxCapacity) {
+ // resize if exceeds trim threshold
+ if (_lookupMap.size() >= _trimThreshold) {
if (_hasOrderBy) {
// reached capacity, resize
_readWriteLock.writeLock().lock();
try {
- if (_lookupMap.size() >= _maxCapacity) {
- resize(_capacity);
+ if (_lookupMap.size() >= _trimThreshold) {
+ resize(_trimSize);
}
} finally {
_readWriteLock.writeLock().unlock();
@@ -116,7 +113,7 @@ public class ConcurrentIndexedTable extends IndexedTable {
@Override
public int size() {
- return _lookupMap.size();
+ return _sortedRecords == null ? _lookupMap.size() : _sortedRecords.size();
}
@Override
@@ -125,52 +122,55 @@ public class ConcurrentIndexedTable extends IndexedTable {
}
private void resize(int trimToSize) {
-
long startTime = System.currentTimeMillis();
-
- _tableResizer.resizeRecordsMap(_lookupMap, trimToSize);
-
+ // when the resizer trims using a PQ, it will return a new trimmed map.
+ // the reference held by the indexed table needs to be updated. this is
also
+ // the reason why it is volatile since the thread doing the resize will
result in
+ // a new reference
+ _lookupMap = (ConcurrentMap)_tableResizer.resizeRecordsMap(_lookupMap,
trimToSize);
long endTime = System.currentTimeMillis();
long timeElapsed = endTime - startTime;
-
_numResizes.incrementAndGet();
- _resizeTime.addAndGet(timeElapsed);
+ _resizeTimeMs.addAndGet(timeElapsed);
}
private List<Record> resizeAndSort(int trimToSize) {
-
long startTime = System.currentTimeMillis();
-
- List<Record> sortedRecords =
_tableResizer.resizeAndSortRecordsMap(_lookupMap, trimToSize);
-
+ List<Record> sortedRecords = _tableResizer.sortRecordsMap(_lookupMap,
trimToSize);
long endTime = System.currentTimeMillis();
long timeElapsed = endTime - startTime;
-
_numResizes.incrementAndGet();
- _resizeTime.addAndGet(timeElapsed);
-
+ _resizeTimeMs.addAndGet(timeElapsed);
return sortedRecords;
}
@Override
public void finish(boolean sort) {
-
if (_hasOrderBy) {
-
if (sort) {
- List<Record> sortedRecords = resizeAndSort(_capacity);
- _iterator = sortedRecords.iterator();
+ _sortedRecords = resizeAndSort(_trimSize);
+ _iterator = _sortedRecords.iterator();
} else {
- resize(_capacity);
+ resize(_trimSize);
}
int numResizes = _numResizes.get();
- long resizeTime = _resizeTime.get();
- LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg
resize time : {}", numResizes, resizeTime,
- numResizes == 0 ? 0 : resizeTime / numResizes);
+ long resizeTime = _resizeTimeMs.get();
+ LOGGER.debug(
+ "Num resizes : {}, Total time spent in resizing : {}, Avg resize
time : {}, trimSize: {}, trimThreshold: {}",
+ numResizes, resizeTime, numResizes == 0 ? 0 : resizeTime /
numResizes, _trimSize, _trimThreshold);
}
-
if (_iterator == null) {
_iterator = _lookupMap.values().iterator();
}
}
+
+ @Override
+ public int getNumResizes() {
+ return _numResizes.get();
+ }
+
+ @Override
+ public long getResizeTimeMs() {
+ return _resizeTimeMs.get();
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index 5867256..04feade 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -36,40 +36,43 @@ public abstract class IndexedTable extends BaseTable {
protected final AggregationFunction[] _aggregationFunctions;
protected final boolean _hasOrderBy;
protected final TableResizer _tableResizer;
+ protected List<Record> _sortedRecords;
+ // The size we need to trim to
+ protected final int _trimSize;
+ // The size with added buffer, in order to collect more records than
capacity for better precision
+ protected final int _trimThreshold;
- // The capacity we need to trim to
- protected final int _capacity;
- // The capacity with added buffer, in order to collect more records than
capacity for better precision
- protected final int _maxCapacity;
-
- protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int
capacity) {
+ protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int
trimSize, int trimThreshold) {
super(dataSchema);
-
List<ExpressionContext> groupByExpressions =
queryContext.getGroupByExpressions();
assert groupByExpressions != null;
_numKeyColumns = groupByExpressions.size();
-
_aggregationFunctions = queryContext.getAggregationFunctions();
-
List<OrderByExpressionContext> orderByExpressions =
queryContext.getOrderByExpressions();
if (orderByExpressions != null) {
+ // SQL GROUP BY with ORDER BY
+ // trimSize = max (limit N * 5, 5000) (see
GroupByUtils.getTableCapacity).
+ // trimSize is also bound by trimThreshold/2 to protect the server in
case
+ // when user specifies a very high value of LIMIT N.
+ // trimThreshold is configurable. to keep parity with PQL for some use
+ // cases with infinitely large group by, trimThreshold will be >= 1B
+ // (exactly same as PQL). This essentially implies there will be no
+ // resizing/trimming during upsert and exactly one trim during finish.
_hasOrderBy = true;
_tableResizer = new TableResizer(dataSchema, queryContext);
- _capacity = capacity;
-
- // TODO: tune these numbers and come up with a better formula (github
ISSUE-4801)
- // Based on the capacity and maxCapacity, the resizer will smartly
choose to evict/retain recors from the PQ
- if (capacity
- <= 100_000) { // Capacity is small, make a very large buffer. Make
PQ of records to retain, during resize
- _maxCapacity = 1_000_000;
- } else { // Capacity is large, make buffer only slightly bigger. Make PQ
of records to evict, during resize
- _maxCapacity = (int) (capacity * 1.2);
- }
+ _trimSize = Math.min(trimSize, trimThreshold / 2);
+ _trimThreshold = trimThreshold;
} else {
+ // SQL GROUP BY without ORDER BY
+ // trimSize = LIMIT N (see GroupByUtils.getTableCapacity)
+ // trimThreshold is same as trimSize since indexed table stops
+ // accepting records once map size reaches trimSize. there is no
+ // resize/trim during upsert since the results can be arbitrary
+ // and are truncated once they reach trimSize
_hasOrderBy = false;
_tableResizer = null;
- _capacity = capacity;
- _maxCapacity = capacity;
+ _trimSize = trimSize;
+ _trimThreshold = trimSize;
}
}
@@ -80,4 +83,8 @@ public abstract class IndexedTable extends BaseTable {
Object[] keyValues = Arrays.copyOf(record.getValues(), _numKeyColumns);
return upsert(new Key(keyValues), record);
}
+
+ public abstract int getNumResizes();
+
+ public abstract long getResizeTimeMs();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index 05b5359..a7e133a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -37,16 +37,15 @@ import org.slf4j.LoggerFactory;
public class SimpleIndexedTable extends IndexedTable {
private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleIndexedTable.class);
- private final Map<Key, Record> _lookupMap;
+ private Map<Key, Record> _lookupMap;
private Iterator<Record> _iterator;
private boolean _noMoreNewRecords = false;
private int _numResizes = 0;
- private long _resizeTime = 0;
-
- public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext,
int capacity) {
- super(dataSchema, queryContext, capacity);
+ private long _resizeTimeMs = 0;
+ public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext,
int trimSize, int trimThreshold) {
+ super(dataSchema, queryContext, trimSize, trimThreshold);
_lookupMap = new HashMap<>();
}
@@ -56,7 +55,6 @@ public class SimpleIndexedTable extends IndexedTable {
@Override
public boolean upsert(Key key, Record newRecord) {
Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
-
if (_noMoreNewRecords) { // allow only existing record updates
_lookupMap.computeIfPresent(key, (k, v) -> {
Object[] existingValues = v.getValues();
@@ -83,10 +81,10 @@ public class SimpleIndexedTable extends IndexedTable {
}
});
- if (_lookupMap.size() >= _maxCapacity) {
+ if (_lookupMap.size() >= _trimThreshold) {
if (_hasOrderBy) {
// reached max capacity, resize
- resize(_capacity);
+ resize(_trimSize);
} else {
// reached max capacity and no order by. No more new records will be
accepted
_noMoreNewRecords = true;
@@ -97,36 +95,27 @@ public class SimpleIndexedTable extends IndexedTable {
}
private void resize(int trimToSize) {
-
long startTime = System.currentTimeMillis();
-
- _tableResizer.resizeRecordsMap(_lookupMap, trimToSize);
-
+ _lookupMap = _tableResizer.resizeRecordsMap(_lookupMap, trimToSize);
long endTime = System.currentTimeMillis();
long timeElapsed = endTime - startTime;
-
_numResizes++;
- _resizeTime += timeElapsed;
+ _resizeTimeMs += timeElapsed;
}
private List<Record> resizeAndSort(int trimToSize) {
-
long startTime = System.currentTimeMillis();
-
- List<Record> sortedRecords =
_tableResizer.resizeAndSortRecordsMap(_lookupMap, trimToSize);
-
+ List<Record> sortedRecords = _tableResizer.sortRecordsMap(_lookupMap,
trimToSize);
long endTime = System.currentTimeMillis();
long timeElapsed = endTime - startTime;
-
_numResizes++;
- _resizeTime += timeElapsed;
-
+ _resizeTimeMs += timeElapsed;
return sortedRecords;
}
@Override
public int size() {
- return _lookupMap.size();
+ return _sortedRecords == null ? _lookupMap.size() : _sortedRecords.size();
}
@Override
@@ -136,22 +125,28 @@ public class SimpleIndexedTable extends IndexedTable {
@Override
public void finish(boolean sort) {
-
if (_hasOrderBy) {
-
if (sort) {
- List<Record> sortedRecords = resizeAndSort(_capacity);
- _iterator = sortedRecords.iterator();
+ _sortedRecords = resizeAndSort(_trimSize);
+ _iterator = _sortedRecords.iterator();
} else {
- resize(_capacity);
+ resize(_trimSize);
}
- LOGGER
- .debug("Num resizes : {}, Total time spent in resizing : {}, Avg
resize time : {}", _numResizes, _resizeTime,
- _numResizes == 0 ? 0 : _resizeTime / _numResizes);
+ LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg
resize time : {}, trimSize: {}, trimThreshold: {}",
+ _numResizes, _resizeTimeMs, _numResizes == 0 ? 0 : _resizeTimeMs /
_numResizes, _trimSize, _trimThreshold);
}
-
if (_iterator == null) {
_iterator = _lookupMap.values().iterator();
}
}
+
+ @Override
+ public int getNumResizes() {
+ return _numResizes;
+ }
+
+ @Override
+ public long getResizeTimeMs() {
+ return _resizeTimeMs;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
index 256f8bd..2b1470f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -136,39 +138,48 @@ public class TableResizer {
* Resize only if number of records is greater than trimToSize
* The resizer smartly chooses to create PQ of records to evict or records
to retain, based on the number of records and the number of records to evict
*/
- public void resizeRecordsMap(Map<Key, Record> recordsMap, int trimToSize) {
+ public Map<Key, Record> resizeRecordsMap(Map<Key, Record> recordsMap, int
trimToSize) {
int numRecordsToEvict = recordsMap.size() - trimToSize;
-
if (numRecordsToEvict > 0) {
// TODO: compare the performance of converting to IntermediateRecord vs
keeping Record, in cases where we do not need to extract final results
-
- if (numRecordsToEvict < trimToSize) { // num records to evict is smaller
than num records to retain
+ if (numRecordsToEvict < trimToSize) {
+ // num records to evict is smaller than num records to retain
// make PQ of records to evict
PriorityQueue<IntermediateRecord> priorityQueue =
convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict,
_intermediateRecordComparator);
for (IntermediateRecord evictRecord : priorityQueue) {
recordsMap.remove(evictRecord._key);
}
- } else { // num records to retain is smaller than num records to evict
+ return recordsMap;
+ } else {
+ // num records to retain is smaller than num records to evict
// make PQ of records to retain
+ // TODO - Consider reusing the same map by removing record from the map
+ // at the time it is evicted from PQ
+ Map<Key, Record> trimmedRecordsMap;
+ if (recordsMap instanceof ConcurrentMap) {
+ // invoked by ConcurrentIndexedTable
+ trimmedRecordsMap = new ConcurrentHashMap<>();
+ } else {
+ // invoked by SimpleIndexedTable
+ trimmedRecordsMap = new HashMap<>();
+ }
Comparator<IntermediateRecord> comparator =
_intermediateRecordComparator.reversed();
PriorityQueue<IntermediateRecord> priorityQueue =
convertToIntermediateRecordsPQ(recordsMap, trimToSize, comparator);
- ObjectOpenHashSet<Key> keysToRetain = new
ObjectOpenHashSet<>(priorityQueue.size());
- for (IntermediateRecord retainRecord : priorityQueue) {
- keysToRetain.add(retainRecord._key);
+ for (IntermediateRecord recordToRetain : priorityQueue) {
+ trimmedRecordsMap.put(recordToRetain._key,
recordsMap.get(recordToRetain._key));
}
- recordsMap.keySet().retainAll(keysToRetain);
+ return trimmedRecordsMap;
}
}
+ return recordsMap;
}
private PriorityQueue<IntermediateRecord>
convertToIntermediateRecordsPQ(Map<Key, Record> recordsMap, int size,
Comparator<IntermediateRecord> comparator) {
PriorityQueue<IntermediateRecord> priorityQueue = new
PriorityQueue<>(size, comparator);
-
for (Map.Entry<Key, Record> entry : recordsMap.entrySet()) {
-
IntermediateRecord intermediateRecord =
getIntermediateRecord(entry.getKey(), entry.getValue());
if (priorityQueue.size() < size) {
priorityQueue.offer(intermediateRecord);
@@ -183,62 +194,25 @@ public class TableResizer {
return priorityQueue;
}
- private List<Record> sortRecordsMap(Map<Key, Record> recordsMap) {
- int numRecords = recordsMap.size();
- List<Record> sortedRecords = new ArrayList<>(numRecords);
- List<IntermediateRecord> intermediateRecords = new ArrayList<>(numRecords);
- for (Map.Entry<Key, Record> entry : recordsMap.entrySet()) {
- intermediateRecords.add(getIntermediateRecord(entry.getKey(),
entry.getValue()));
- }
- intermediateRecords.sort(_intermediateRecordComparator);
- for (IntermediateRecord intermediateRecord : intermediateRecords) {
- sortedRecords.add(recordsMap.get(intermediateRecord._key));
- }
- return sortedRecords;
- }
-
/**
- * Resizes the recordsMap and returns a sorted list of records.
+ * Sorts the recordsMap using a priority queue and returns a sorted list of
records
* This method is to be called from IndexedTable::finish, if both resize and
sort is needed
- *
- * If numRecordsToEvict > numRecordsToRetain, resize with PQ of records to
evict, and then sort
- * Else, resize with PQ of record to retain, then use the PQ to create
sorted list
*/
- public List<Record> resizeAndSortRecordsMap(Map<Key, Record> recordsMap, int
trimToSize) {
+ public List<Record> sortRecordsMap(Map<Key, Record> recordsMap, int
trimToSize) {
int numRecords = recordsMap.size();
if (numRecords == 0) {
return Collections.emptyList();
}
-
int numRecordsToRetain = Math.min(numRecords, trimToSize);
- int numRecordsToEvict = numRecords - numRecordsToRetain;
-
- if (numRecordsToEvict < numRecordsToRetain) { // num records to evict is
smaller than num records to retain
- if (numRecordsToEvict > 0) {
- // make PQ of records to evict
- PriorityQueue<IntermediateRecord> priorityQueue =
- convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict,
_intermediateRecordComparator);
- for (IntermediateRecord evictRecord : priorityQueue) {
- recordsMap.remove(evictRecord._key);
- }
- }
- return sortRecordsMap(recordsMap);
- } else {
- // make PQ of records to retain
- PriorityQueue<IntermediateRecord> priorityQueue =
- convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain,
_intermediateRecordComparator.reversed());
- // use PQ to get sorted list
- Record[] sortedArray = new Record[numRecordsToRetain];
- ObjectOpenHashSet<Key> keysToRetain = new
ObjectOpenHashSet<>(numRecordsToRetain);
- while (!priorityQueue.isEmpty()) {
- IntermediateRecord intermediateRecord = priorityQueue.poll();
- keysToRetain.add(intermediateRecord._key);
- Record record = recordsMap.get(intermediateRecord._key);
- sortedArray[--numRecordsToRetain] = record;
- }
- recordsMap.keySet().retainAll(keysToRetain);
- return Arrays.asList(sortedArray);
+ // make PQ of sorted records to retain
+ PriorityQueue<IntermediateRecord> priorityQueue =
convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain,
_intermediateRecordComparator.reversed());
+ Record[] sortedArray = new Record[numRecordsToRetain];
+ while (!priorityQueue.isEmpty()) {
+ IntermediateRecord intermediateRecord = priorityQueue.poll();
+ Record record = recordsMap.get(intermediateRecord._key);
+ sortedArray[--numRecordsToRetain] = record;
}
+ return Arrays.asList(sortedArray);
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
new file mode 100644
index 0000000..18d30cf
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Another version of {@link ConcurrentIndexedTable} used for use cases
+ * configured to have infinite group by (num group limit to se 1B or higher).
+ * For such cases, there won't be any resizing/trimming during upsert since
+ * trimThreshold is very high. Thus, we can avoid the overhead of readLock's
+ * lock and unlock operations which are otherwise used in
ConcurrentIndexedTable
+ * to prevent race conditions between 2 threads doing upsert and one of them
ends
+ * up trimming from upsert. For use cases with very large number of groups, we
had
+ * noticed that load-unlock overhead was > 1sec and this specialized concurrent
+ * indexed table avoids that by overriding just the upsert method
+ */
+public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {
+
+ public UnboundedConcurrentIndexedTable(DataSchema dataSchema,
+ QueryContext queryContext, int trimSize, int trimThreshold) {
+ super(dataSchema, queryContext, trimSize, trimThreshold);
+ }
+
+ @Override
+ public boolean upsert(Key key, Record newRecord) {
+ if (_noMoreNewRecords.get()) {
+ // allow only existing record updates
+ _lookupMap.computeIfPresent(key, (k, v) -> {
+ Object[] existingValues = v.getValues();
+ Object[] newValues = newRecord.getValues();
+ int aggNum = 0;
+ for (int i = _numKeyColumns; i < _numColumns; i++) {
+ existingValues[i] =
_aggregationFunctions[aggNum++].merge(existingValues[i], newValues[i]);
+ }
+ return v;
+ });
+ } else {
+ // allow all records
+ _lookupMap.compute(key, (k, v) -> {
+ if (v == null) {
+ return newRecord;
+ } else {
+ Object[] existingValues = v.getValues();
+ Object[] newValues = newRecord.getValues();
+ int aggNum = 0;
+ for (int i = _numKeyColumns; i < _numColumns; i++) {
+ existingValues[i] =
_aggregationFunctions[aggNum++].merge(existingValues[i], newValues[i]);
+ }
+ return v;
+ }
+ });
+
+ if (_lookupMap.size() >= _trimSize && !_hasOrderBy) {
+ // reached capacity and no order by. No more new records will be
accepted
+ _noMoreNewRecords.set(true);
+ }
+ }
+ return true;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index 79ac520..09784fa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -40,10 +40,13 @@ import
org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.Table;
+import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -64,6 +67,8 @@ public class IntermediateResultsBlock implements Block {
private int _numSegmentsProcessed;
private int _numSegmentsMatched;
private boolean _numGroupsLimitReached;
+ private int _numResizes;
+ private long _resizeTimeMs;
private Table _table;
@@ -218,6 +223,14 @@ public class IntermediateResultsBlock implements Block {
_numGroupsLimitReached = numGroupsLimitReached;
}
+ public void setNumResizes(int numResizes) {
+ _numResizes = numResizes;
+ }
+
+ public void setResizeTimeMs(long resizeTimeMs) {
+ _resizeTimeMs = resizeTimeMs;
+ }
+
@VisibleForTesting
public long getNumDocsScanned() {
return _numDocsScanned;
@@ -278,17 +291,16 @@ public class IntermediateResultsBlock implements Block {
private DataTable getResultDataTable()
throws IOException {
-
DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
Iterator<Record> iterator = _table.iterator();
+ ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
while (iterator.hasNext()) {
Record record = iterator.next();
dataTableBuilder.startRow();
int columnIndex = 0;
for (Object value : record.getValues()) {
- ColumnDataType columnDataType =
_dataSchema.getColumnDataType(columnIndex);
- setDataTableColumn(columnDataType, dataTableBuilder, columnIndex,
value);
+ setDataTableColumn(columnDataTypes[columnIndex], dataTableBuilder,
columnIndex, value);
columnIndex++;
}
dataTableBuilder.finishRow();
@@ -416,6 +428,8 @@ public class IntermediateResultsBlock implements Block {
.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
String.valueOf(_numEntriesScannedPostFilter));
dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_PROCESSED,
String.valueOf(_numSegmentsProcessed));
dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_MATCHED,
String.valueOf(_numSegmentsMatched));
+ dataTable.getMetadata().put(DataTable.NUM_RESIZES_METADATA_KEY,
String.valueOf(_numResizes));
+ dataTable.getMetadata().put(DataTable.RESIZE_TIME_MS_METADATA_KEY,
String.valueOf(_resizeTimeMs));
dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY,
String.valueOf(_numTotalDocs));
if (_numGroupsLimitReached) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 37af764..0d5ce5b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -62,24 +63,27 @@ import org.slf4j.LoggerFactory;
public class GroupByOrderByCombineOperator extends
BaseOperator<IntermediateResultsBlock> {
private static final Logger LOGGER =
LoggerFactory.getLogger(GroupByOrderByCombineOperator.class);
private static final String OPERATOR_NAME = "GroupByOrderByCombineOperator";
+ public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
private final List<Operator> _operators;
private final QueryContext _queryContext;
private final ExecutorService _executorService;
private final long _endTimeMs;
- private final int _indexedTableCapacity;
+ private final int _trimSize;
+ private final int _trimThreshold;
private final Lock _initLock;
private DataSchema _dataSchema;
private ConcurrentIndexedTable _indexedTable;
public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext
queryContext,
- ExecutorService executorService, long endTimeMs) {
+ ExecutorService executorService, long endTimeMs, int trimThreshold) {
_operators = operators;
_queryContext = queryContext;
_executorService = executorService;
_endTimeMs = endTimeMs;
_initLock = new ReentrantLock();
- _indexedTableCapacity = GroupByUtils.getTableCapacity(_queryContext);
+ _trimSize = GroupByUtils.getTableCapacity(_queryContext);
+ _trimThreshold = trimThreshold;
}
/**
@@ -140,7 +144,16 @@ public class GroupByOrderByCombineOperator extends
BaseOperator<IntermediateResu
try {
if (_dataSchema == null) {
_dataSchema = intermediateResultsBlock.getDataSchema();
- _indexedTable = new ConcurrentIndexedTable(_dataSchema,
_queryContext, _indexedTableCapacity);
+ if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+ // special case of trim threshold where it is set to max
value.
+ // there won't be any trimming during upsert in this case.
+ // thus we can avoid the overhead of read-lock and write-lock
+ // in the upsert method.
+ _indexedTable = new
UnboundedConcurrentIndexedTable(_dataSchema, _queryContext,
+ _trimSize, _trimThreshold);
+ } else {
+ _indexedTable = new ConcurrentIndexedTable(_dataSchema,
_queryContext, _trimSize, _trimThreshold);
+ }
}
} finally {
_initLock.unlock();
@@ -241,10 +254,10 @@ public class GroupByOrderByCombineOperator extends
BaseOperator<IntermediateResu
// Set the execution statistics.
CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+ mergedBlock.setNumResizes(_indexedTable.getNumResizes());
+ mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs());
- if (_indexedTable.size() >= _indexedTableCapacity) {
- mergedBlock.setNumGroupsLimitReached(true);
- }
+ // TODO - set numGroupsLimitReached
return mergedBlock;
} catch (Exception e) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 0bce7c0..fb634c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -61,6 +61,8 @@ public class CombinePlanNode implements PlanNode {
private final long _endTimeMs;
private final int _numGroupsLimit;
private final StreamObserver<Server.ServerResponse> _streamObserver;
+ // used for SQL GROUP BY during server combine
+ private final int _groupByTrimThreshold;
/**
* Constructor for the class.
@@ -71,15 +73,18 @@ public class CombinePlanNode implements PlanNode {
* @param endTimeMs End time in milliseconds for the query
* @param numGroupsLimit Limit of number of groups stored in each segment
* @param streamObserver Optional stream observer for streaming query
+ * @param groupByTrimThreshold trim threshold to use for server combine for
SQL GROUP BY
*/
public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext,
ExecutorService executorService,
- long endTimeMs, int numGroupsLimit, @Nullable
StreamObserver<Server.ServerResponse> streamObserver) {
+ long endTimeMs, int numGroupsLimit, @Nullable
StreamObserver<Server.ServerResponse> streamObserver,
+ int groupByTrimThreshold) {
_planNodes = planNodes;
_queryContext = queryContext;
_executorService = executorService;
_endTimeMs = endTimeMs;
_numGroupsLimit = numGroupsLimit;
_streamObserver = streamObserver;
+ _groupByTrimThreshold = groupByTrimThreshold;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -175,7 +180,8 @@ public class CombinePlanNode implements PlanNode {
// Aggregation group-by
QueryOptions queryOptions = new
QueryOptions(_queryContext.getQueryOptions());
if (queryOptions.isGroupByModeSQL()) {
- return new GroupByOrderByCombineOperator(operators, _queryContext,
_executorService, _endTimeMs);
+ return new GroupByOrderByCombineOperator(operators, _queryContext,
_executorService, _endTimeMs,
+ _groupByTrimThreshold);
}
return new GroupByCombineOperator(operators, _queryContext,
_executorService, _endTimeMs, _numGroupsLimit);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 7d77549..1794015 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -62,20 +62,28 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
+ // set as pinot.server.query.executor.groupby.trim.threshold
+ public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold";
+ public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+
private final int _maxInitialResultHolderCapacity;
// Limit on number of groups stored for each segment, beyond which no new
group will be created
private final int _numGroupsLimit;
+ // Used for SQL GROUP BY (server combine)
+ private final int _groupByTrimThreshold;
@VisibleForTesting
public InstancePlanMakerImplV2() {
_maxInitialResultHolderCapacity =
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
_numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
+ _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
}
@VisibleForTesting
public InstancePlanMakerImplV2(int maxInitialResultHolderCapacity, int
numGroupsLimit) {
_maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
_numGroupsLimit = numGroupsLimit;
+ _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
}
/**
@@ -91,6 +99,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
_maxInitialResultHolderCapacity = queryExecutorConfig.getConfig()
.getProperty(MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY,
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
_numGroupsLimit =
queryExecutorConfig.getConfig().getProperty(NUM_GROUPS_LIMIT,
DEFAULT_NUM_GROUPS_LIMIT);
+ _groupByTrimThreshold =
+ queryExecutorConfig.getConfig().getProperty(GROUPBY_TRIM_THRESHOLD,
DEFAULT_GROUPBY_TRIM_THRESHOLD);
Preconditions.checkState(_maxInitialResultHolderCapacity <=
_numGroupsLimit,
"Invalid configuration: maxInitialResultHolderCapacity: %d must be
smaller or equal to numGroupsLimit: %d",
_maxInitialResultHolderCapacity, _numGroupsLimit);
@@ -106,7 +116,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
}
CombinePlanNode combinePlanNode =
- new CombinePlanNode(planNodes, queryContext, executorService,
endTimeMs, _numGroupsLimit, null);
+ new CombinePlanNode(planNodes, queryContext, executorService,
endTimeMs, _numGroupsLimit, null,
+ _groupByTrimThreshold);
return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
}
@@ -153,7 +164,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext));
}
CombinePlanNode combinePlanNode =
- new CombinePlanNode(planNodes, queryContext, executorService,
endTimeMs, _numGroupsLimit, streamObserver);
+ new CombinePlanNode(planNodes, queryContext, executorService,
endTimeMs, _numGroupsLimit, streamObserver,
+ _groupByTrimThreshold);
return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 0ff1098..7d61015 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -66,10 +66,13 @@ public class BrokerReduceService {
private final ExecutorService _reduceExecutorService;
private final int _maxReduceThreadsPerQuery;
+ private final int _groupByTrimThreshold;
public BrokerReduceService(PinotConfiguration config) {
_maxReduceThreadsPerQuery =
config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
CommonConstants.Broker.DEFAULT_MAX_REDUCE_THREADS_PER_QUERY);
+ _groupByTrimThreshold =
config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD,
+ CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
int numThreadsInExecutorService =
Runtime.getRuntime().availableProcessors();
LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max
reduce threads.",
@@ -225,8 +228,9 @@ public class BrokerReduceService {
QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
DataTableReducer dataTableReducer =
ResultReducerFactory.getResultReducer(queryContext);
- dataTableReducer.reduceAndSetResults(tableName, cachedDataSchema,
dataTableMap, brokerResponseNative,
- new DataTableReducerContext(_reduceExecutorService,
_maxReduceThreadsPerQuery, reduceTimeOutMs), brokerMetrics);
+ dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema,
dataTableMap, brokerResponseNative,
+ new DataTableReducerContext(_reduceExecutorService,
_maxReduceThreadsPerQuery, reduceTimeOutMs,
+ _groupByTrimThreshold), brokerMetrics);
updateAlias(queryContext, brokerResponseNative);
return brokerResponseNative;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
index 1e29378..d4946df 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
@@ -29,6 +29,8 @@ public class DataTableReducerContext {
private final ExecutorService _executorService;
private final int _maxReduceThreadsPerQuery;
private final long _reduceTimeOutMs;
+ // used for SQL GROUP BY
+ private final int _groupByTrimThreshold;
/**
* Constructor for the class.
@@ -36,11 +38,14 @@ public class DataTableReducerContext {
* @param executorService Executor service to use for DataTableReducer
* @param maxReduceThreadsPerQuery Max number of threads to use for reduce
phase
* @param reduceTimeOutMs Reduce Phase timeOut in ms
+ * @param groupByTrimThreshold trim threshold for SQL group by
*/
- public DataTableReducerContext(ExecutorService executorService, int
maxReduceThreadsPerQuery, long reduceTimeOutMs) {
+ public DataTableReducerContext(ExecutorService executorService, int
maxReduceThreadsPerQuery, long reduceTimeOutMs,
+ int groupByTrimThreshold) {
_executorService = executorService;
_maxReduceThreadsPerQuery = maxReduceThreadsPerQuery;
_reduceTimeOutMs = reduceTimeOutMs;
+ _groupByTrimThreshold = groupByTrimThreshold;
}
public ExecutorService getExecutorService() {
@@ -54,4 +59,8 @@ public class DataTableReducerContext {
public long getReduceTimeOutMs() {
return _reduceTimeOutMs;
}
+
+ public int getGroupByTrimThreshold() {
+ return _groupByTrimThreshold;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 7cc423a..2ec1161 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -45,6 +45,8 @@ import
org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
@@ -124,7 +126,8 @@ public class GroupByDataTableReducer implements
DataTableReducer {
// This is the primary SQL compliant group by
try {
- setSQLGroupByInResultTable(brokerResponseNative, dataSchema,
dataTables, reducerContext);
+ setSQLGroupByInResultTable(brokerResponseNative, dataSchema,
dataTables, reducerContext, tableName,
+ brokerMetrics);
} catch (TimeoutException e) {
brokerResponseNative.getProcessingExceptions()
.add(new
QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE,
e.getMessage()));
@@ -181,12 +184,19 @@ public class GroupByDataTableReducer implements
DataTableReducer {
* @param dataSchema data schema
* @param dataTables Collection of data tables
* @param reducerContext DataTableReducer context
+ * @param rawTableName table name
+ * @param brokerMetrics broker metrics (meters)
* @throws TimeoutException If unable complete within timeout.
*/
private void setSQLGroupByInResultTable(BrokerResponseNative
brokerResponseNative, DataSchema dataSchema,
- Collection<DataTable> dataTables, DataTableReducerContext reducerContext)
+ Collection<DataTable> dataTables, DataTableReducerContext
reducerContext, String rawTableName,
+ BrokerMetrics brokerMetrics)
throws TimeoutException {
IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables,
reducerContext);
+ if (brokerMetrics != null) {
+ brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+ brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+ }
Iterator<Record> sortedIterator = indexedTable.iterator();
DataSchema prePostAggregationDataSchema =
getPrePostAggregationDataSchema(dataSchema);
int limit = _queryContext.getLimit();
@@ -262,13 +272,24 @@ public class GroupByDataTableReducer implements
DataTableReducer {
int numDataTables = dataTablesToReduce.size();
// Get the number of threads to use for reducing.
- int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables,
reducerContext.getMaxReduceThreadsPerQuery());
-
// In case of single reduce thread, fall back to SimpleIndexedTable to
avoid redundant locking/unlocking calls.
- int capacity = GroupByUtils.getTableCapacity(_queryContext);
- IndexedTable indexedTable =
- (numReduceThreadsToUse > 1) ? new ConcurrentIndexedTable(dataSchema,
_queryContext, capacity)
- : new SimpleIndexedTable(dataSchema, _queryContext, capacity);
+ int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables,
reducerContext.getMaxReduceThreadsPerQuery());
+ int trimSize = GroupByUtils.getTableCapacity(_queryContext);
+ int trimThreshold = reducerContext.getGroupByTrimThreshold();
+ IndexedTable indexedTable;
+ if (numReduceThreadsToUse <= 1) {
+ indexedTable = new SimpleIndexedTable(dataSchema, _queryContext,
trimSize, trimThreshold);
+ } else {
+ if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
+ // special case of trim threshold where it is set to max value.
+ // there won't be any trimming during upsert in this case.
+ // thus we can avoid the overhead of read-lock and write-lock
+ // in the upsert method.
+ indexedTable = new UnboundedConcurrentIndexedTable(dataSchema,
_queryContext, trimSize, trimThreshold);
+ } else {
+ indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext,
trimSize, trimThreshold);
+ }
+ }
Future[] futures = new Future[numDataTables];
CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 19afc4f..69c43de 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -60,6 +60,8 @@ public abstract class QueryScheduler {
private static final String INVALID_NUM_SCANNED = "-1";
private static final String INVALID_SEGMENTS_COUNT = "-1";
private static final String INVALID_FRESHNESS_MS = "-1";
+ private static final String INVALID_NUM_RESIZES = "-1";
+ private static final String INVALID_RESIZE_TIME_MS = "-1";
private static final String QUERY_LOG_MAX_RATE_KEY =
"query.log.maxRatePerSecond";
private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
@@ -183,6 +185,8 @@ public abstract class QueryScheduler {
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED,
INVALID_SEGMENTS_COUNT));
long minConsumingFreshnessMs =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS,
INVALID_FRESHNESS_MS));
+ int numResizes =
Integer.parseInt(dataTableMetadata.getOrDefault(DataTable.NUM_RESIZES_METADATA_KEY,
INVALID_NUM_RESIZES));
+ long resizeTimeMs =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.RESIZE_TIME_MS_METADATA_KEY,
INVALID_RESIZE_TIME_MS));
if (numDocsScanned > 0) {
serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
@@ -195,6 +199,12 @@ public abstract class QueryScheduler {
serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER,
numEntriesScannedPostFilter);
}
+ if (numResizes > 0) {
+ serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_RESIZES, numResizes);
+ }
+ if (resizeTimeMs > 0) {
+ serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.RESIZE_TIME_MS, resizeTimeMs);
+ }
TimerContext timerContext = queryRequest.getTimerContext();
int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
index 166af6d..73fcd87 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
@@ -44,6 +44,8 @@ import org.testng.annotations.Test;
@SuppressWarnings({"rawtypes"})
public class IndexedTableTest {
+ private static final int TRIM_THRESHOLD = 20;
+
@Test
public void testConcurrentIndexedTable()
throws InterruptedException, TimeoutException, ExecutionException {
@@ -51,7 +53,7 @@ public class IndexedTableTest {
.getQueryContextFromSQL("SELECT SUM(m1), MAX(m2) FROM testTable GROUP
BY d1, d2, d3 ORDER BY SUM(m1)");
DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3",
"sum(m1)", "max(m2)"},
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
- IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema,
queryContext, 5);
+ IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema,
queryContext, 5, TRIM_THRESHOLD);
// 3 threads upsert together
// a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times
(20)
@@ -64,8 +66,7 @@ public class IndexedTableTest {
Callable<Void> c1 = () -> {
indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}), getRecord(new
Object[]{"a", 1, 10d, 10d, 100d}));
indexedTable.upsert(getKey(new Object[]{"b", 2, 20d}), getRecord(new
Object[]{"b", 2, 20d, 10d, 200d}));
- indexedTable.upsert(getKey(new Object[]{"c", 3, 30d}),
- getRecord(new Object[]{"c", 3, 30d, 10000d, 300d})); // eviction
candidate
+ indexedTable.upsert(getKey(new Object[]{"c", 3, 30d}), getRecord(new
Object[]{"c", 3, 30d, 10000d, 300d})); // eviction candidate
indexedTable.upsert(getKey(new Object[]{"d", 4, 40d}), getRecord(new
Object[]{"d", 4, 40d, 10d, 400d}));
indexedTable.upsert(getKey(new Object[]{"d", 4, 40d}), getRecord(new
Object[]{"d", 4, 40d, 10d, 400d}));
indexedTable.upsert(getKey(new Object[]{"e", 5, 50d}), getRecord(new
Object[]{"e", 5, 50d, 10d, 500d}));
@@ -74,8 +75,7 @@ public class IndexedTableTest {
Callable<Void> c2 = () -> {
indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}), getRecord(new
Object[]{"a", 1, 10d, 10d, 100d}));
- indexedTable.upsert(getKey(new Object[]{"f", 6, 60d}),
- getRecord(new Object[]{"f", 6, 60d, 20000d, 600d})); // eviction
candidate
+ indexedTable.upsert(getKey(new Object[]{"f", 6, 60d}), getRecord(new
Object[]{"f", 6, 60d, 20000d, 600d})); // eviction candidate
indexedTable.upsert(getKey(new Object[]{"g", 7, 70d}), getRecord(new
Object[]{"g", 7, 70d, 10d, 700d}));
indexedTable.upsert(getKey(new Object[]{"b", 2, 20d}), getRecord(new
Object[]{"b", 2, 20d, 10d, 200d}));
indexedTable.upsert(getKey(new Object[]{"b", 2, 20d}), getRecord(new
Object[]{"b", 2, 20d, 10d, 200d}));
@@ -92,8 +92,7 @@ public class IndexedTableTest {
indexedTable.upsert(getKey(new Object[]{"k", 11, 110d}), getRecord(new
Object[]{"k", 11, 110d, 10d, 1100d}));
indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}), getRecord(new
Object[]{"a", 1, 10d, 10d, 100d}));
indexedTable.upsert(getKey(new Object[]{"l", 12, 120d}), getRecord(new
Object[]{"l", 12, 120d, 10d, 1200d}));
- indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}),
- getRecord(new Object[]{"a", 1, 10d, 10d, 100d})); // trimming
candidate
+ indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}), getRecord(new
Object[]{"a", 1, 10d, 10d, 100d})); // trimming candidate
indexedTable.upsert(getKey(new Object[]{"b", 2, 20d}), getRecord(new
Object[]{"b", 2, 20d, 10d, 200d}));
indexedTable.upsert(getKey(new Object[]{"m", 13, 130d}), getRecord(new
Object[]{"m", 13, 130d, 10d, 1300d}));
indexedTable.upsert(getKey(new Object[]{"n", 14, 140d}), getRecord(new
Object[]{"n", 14, 140d, 10d, 1400d}));
@@ -121,15 +120,15 @@ public class IndexedTableTest {
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.DOUBLE, ColumnDataType.INT, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE});
// Test SimpleIndexedTable
- IndexedTable indexedTable = new SimpleIndexedTable(dataSchema,
queryContext, 5);
- IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, queryContext,
10);
+ IndexedTable indexedTable = new SimpleIndexedTable(dataSchema,
queryContext, 5, TRIM_THRESHOLD);
+ IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, queryContext,
10, TRIM_THRESHOLD);
testNonConcurrent(indexedTable, mergeTable);
indexedTable.finish(true);
checkSurvivors(indexedTable, survivors);
// Test ConcurrentIndexedTable
- indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5);
- mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10);
+ indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5,
TRIM_THRESHOLD);
+ mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10,
TRIM_THRESHOLD);
testNonConcurrent(indexedTable, mergeTable);
indexedTable.finish(true);
checkSurvivors(indexedTable, survivors);
@@ -243,10 +242,10 @@ public class IndexedTableTest {
DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3",
"sum(m1)", "max(m2)"},
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
- IndexedTable indexedTable = new SimpleIndexedTable(dataSchema,
queryContext, 5);
+ IndexedTable indexedTable = new SimpleIndexedTable(dataSchema,
queryContext, 5, TRIM_THRESHOLD);
testNoMoreNewRecordsInTable(indexedTable);
- indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5);
+ indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5,
TRIM_THRESHOLD);
testNoMoreNewRecordsInTable(indexedTable);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
index f0358a7..f783f41 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
@@ -175,7 +175,7 @@ public class TableResizerTest {
tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1"));
recordsMap = new HashMap<>(_recordsMap);
- tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
assertEquals(recordsMap.size(), trimToSize);
assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b
assertTrue(recordsMap.containsKey(_keys.get(1)));
@@ -184,7 +184,7 @@ public class TableResizerTest {
tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)"));
recordsMap = new HashMap<>(_recordsMap);
- tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
assertEquals(recordsMap.size(), trimToSize);
assertTrue(recordsMap.containsKey(_keys.get(4))); // 2, 3
assertTrue(recordsMap.containsKey(_keys.get(3)));
@@ -193,7 +193,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX +
"DISTINCTCOUNT(m3) DESC, d1"));
recordsMap = new HashMap<>(_recordsMap);
- tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
assertEquals(recordsMap.size(), trimToSize);
assertTrue(recordsMap.containsKey(_keys.get(4))); // 4, 3
assertTrue(recordsMap.containsKey(_keys.get(3)));
@@ -202,7 +202,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 /
(DISTINCTCOUNT(m3) + 1), d1 DESC"));
recordsMap = new HashMap<>(_recordsMap);
- tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
assertEquals(recordsMap.size(), trimToSize);
assertTrue(recordsMap.containsKey(_keys.get(1))); // 3.33, 12.5
assertTrue(recordsMap.containsKey(_keys.get(0)));
@@ -217,21 +217,21 @@ public class TableResizerTest {
TableResizer tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1"));
Map<Key, Record> recordsMap = new HashMap<>(_recordsMap);
- List<Record> sortedRecords =
tableResizer.resizeAndSortRecordsMap(recordsMap, TRIM_TO_SIZE);
+ List<Record> sortedRecords = tableResizer.sortRecordsMap(recordsMap,
TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(0)); // a, b
assertEquals(sortedRecords.get(1), _records.get(1));
// d1 asc - trim to 1
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+ sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
assertEquals(sortedRecords.get(0), _records.get(0)); // a
// d1 asc, d3 desc (tie breaking with 2nd comparator)
tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, d3
DESC"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap,
TRIM_TO_SIZE);
+ sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(0)); // a, b, c (300)
assertEquals(sortedRecords.get(1), _records.get(1));
@@ -239,7 +239,7 @@ public class TableResizerTest {
// d1 asc, d3 desc (tie breaking with 2nd comparator) - trim to 1
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+ sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
assertEquals(sortedRecords.size(), 1);
assertEquals(sortedRecords.get(0), _records.get(0)); // a
@@ -247,7 +247,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1,
SUM(m1) DESC, max(m2) DESC"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap,
TRIM_TO_SIZE);
+ sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(0)); // a, b, c (30, 300)
assertEquals(sortedRecords.get(1), _records.get(1));
@@ -255,7 +255,7 @@ public class TableResizerTest {
// d1 asc, sum(m1) desc, max(m2) desc - trim to 1
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+ sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
assertEquals(sortedRecords.size(), 1);
assertEquals(sortedRecords.get(0), _records.get(0)); // a
@@ -263,7 +263,7 @@ public class TableResizerTest {
tableResizer =
new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap,
TRIM_TO_SIZE);
+ sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(4)); // 2, 3, 3.33
assertEquals(sortedRecords.get(1), _records.get(3));
@@ -273,7 +273,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX +
"DISTINCTCOUNT(m3) DESC, d1"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap,
TRIM_TO_SIZE);
+ sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(4)); // 4, 3, 2 (b)
assertEquals(sortedRecords.get(1), _records.get(3));
@@ -283,7 +283,7 @@ public class TableResizerTest {
tableResizer = new TableResizer(DATA_SCHEMA,
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 /
(DISTINCTCOUNT(m3) + 1), d1 DESC"));
recordsMap = new HashMap<>(_recordsMap);
- sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap,
TRIM_TO_SIZE);
+ sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
assertEquals(sortedRecords.get(0), _records.get(1)); // 3.33, 12.5, 5
assertEquals(sortedRecords.get(1), _records.get(0));
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
index bfbb6c4..6dcf556 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
@@ -94,7 +94,7 @@ public class CombineSlowOperatorsTest {
List<Operator> operators = getOperators();
GroupByOrderByCombineOperator combineOperator = new
GroupByOrderByCombineOperator(operators,
QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*)
FROM table GROUP BY column"),
- _executorService, TIMEOUT_MS);
+ _executorService, TIMEOUT_MS,
InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
testCombineOperator(operators, combineOperator);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index e45b441..37ccdd4 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -230,7 +230,8 @@ public class SelectionCombineOperatorTest {
}
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes,
queryContext, EXECUTOR,
System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null,
+ InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
return combinePlanNode.run().nextBlock();
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
index 11053c7..2cf4758 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
@@ -58,7 +58,8 @@ public class CombinePlanNodeTest {
}
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes,
_queryContext, _executorService,
System.currentTimeMillis() +
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null,
+ InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
combinePlanNode.run();
Assert.assertEquals(numPlans, count.get());
}
@@ -83,7 +84,8 @@ public class CombinePlanNodeTest {
}
CombinePlanNode combinePlanNode =
new CombinePlanNode(planNodes, _queryContext, _executorService,
System.currentTimeMillis() + 100,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null,
+ InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
try {
combinePlanNode.run();
} catch (RuntimeException e) {
@@ -105,7 +107,8 @@ public class CombinePlanNodeTest {
}
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes,
_queryContext, _executorService,
System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null,
+ InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
try {
combinePlanNode.run();
} catch (RuntimeException e) {
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
index 5c5f2b3..7b57721 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
@@ -40,6 +40,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
@@ -130,10 +131,11 @@ public class BenchmarkCombineGroupBy {
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void concurrentIndexedTableForCombineGroupBy()
throws InterruptedException, ExecutionException, TimeoutException {
- int capacity = GroupByUtils.getTableCapacity(_queryContext);
+ int trimSize = GroupByUtils.getTableCapacity(_queryContext);
// make 1 concurrent table
- IndexedTable concurrentIndexedTable = new
ConcurrentIndexedTable(_dataSchema, _queryContext, capacity);
+ IndexedTable concurrentIndexedTable = new
ConcurrentIndexedTable(_dataSchema, _queryContext, trimSize,
+ InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
index 4100420..c466ad6 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
@@ -56,7 +56,8 @@ import org.openjdk.jmh.runner.options.TimeValue;
@State(Scope.Benchmark)
public class BenchmarkIndexedTable {
- private static final int CAPACITY = 800;
+ private static final int TRIM_SIZE = 800;
+ private static final int TRIM_THRESHOLD = TRIM_SIZE * 4;
private static final int NUM_RECORDS = 1000;
private static final Random RANDOM = new Random();
@@ -113,7 +114,7 @@ public class BenchmarkIndexedTable {
int numSegments = 10;
// make 1 concurrent table
- IndexedTable concurrentIndexedTable = new
ConcurrentIndexedTable(_dataSchema, _queryContext, CAPACITY);
+ IndexedTable concurrentIndexedTable = new
ConcurrentIndexedTable(_dataSchema, _queryContext, TRIM_SIZE, TRIM_THRESHOLD);
// 10 parallel threads putting 10k records into the table
@@ -161,7 +162,7 @@ public class BenchmarkIndexedTable {
for (int i = 0; i < numSegments; i++) {
// make 10 indexed tables
- IndexedTable simpleIndexedTable = new SimpleIndexedTable(_dataSchema,
_queryContext, CAPACITY);
+ IndexedTable simpleIndexedTable = new SimpleIndexedTable(_dataSchema,
_queryContext, TRIM_SIZE, TRIM_THRESHOLD);
simpleIndexedTables.add(simpleIndexedTable);
// put 10k records in each indexed table, in parallel
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]