This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fa7b0e4  Perf optimization for SQL GROUP BY ORDER BY (#6225)
fa7b0e4 is described below

commit fa7b0e4ea05a02855b87e70b50bfd3f76e6afe02
Author: Sidd <[email protected]>
AuthorDate: Thu Nov 19 02:31:53 2020 -0800

    Perf optimization for SQL GROUP BY ORDER BY (#6225)
    
    * SQL group by order by perf optimization
    
    * cleanup
    
    Co-authored-by: Siddharth Teotia <[email protected]>
---
 .../apache/pinot/common/metrics/BrokerMeter.java   |  3 +
 .../apache/pinot/common/metrics/ServerMeter.java   |  2 +
 .../apache/pinot/common/utils/CommonConstants.java |  4 +
 .../org/apache/pinot/common/utils/DataTable.java   |  2 +
 .../core/data/table/ConcurrentIndexedTable.java    | 74 +++++++++---------
 .../apache/pinot/core/data/table/IndexedTable.java | 49 +++++++-----
 .../pinot/core/data/table/SimpleIndexedTable.java  | 57 +++++++-------
 .../apache/pinot/core/data/table/TableResizer.java | 90 ++++++++--------------
 .../table/UnboundedConcurrentIndexedTable.java     | 80 +++++++++++++++++++
 .../operator/blocks/IntermediateResultsBlock.java  | 20 ++++-
 .../combine/GroupByOrderByCombineOperator.java     | 27 +++++--
 .../apache/pinot/core/plan/CombinePlanNode.java    | 10 ++-
 .../core/plan/maker/InstancePlanMakerImplV2.java   | 16 +++-
 .../core/query/reduce/BrokerReduceService.java     |  8 +-
 .../core/query/reduce/DataTableReducerContext.java | 11 ++-
 .../core/query/reduce/GroupByDataTableReducer.java | 37 +++++++--
 .../pinot/core/query/scheduler/QueryScheduler.java | 10 +++
 .../pinot/core/data/table/IndexedTableTest.java    | 25 +++---
 .../pinot/core/data/table/TableResizerTest.java    | 26 +++----
 .../operator/combine/CombineSlowOperatorsTest.java |  2 +-
 .../combine/SelectionCombineOperatorTest.java      |  3 +-
 .../pinot/core/plan/CombinePlanNodeTest.java       |  9 ++-
 .../apache/pinot/perf/BenchmarkCombineGroupBy.java |  6 +-
 .../apache/pinot/perf/BenchmarkIndexedTable.java   |  7 +-
 24 files changed, 370 insertions(+), 208 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index f295435..ac4129f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -64,6 +64,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
   ENTRIES_SCANNED_IN_FILTER("documents", false),
   ENTRIES_SCANNED_POST_FILTER("documents", false),
 
+  NUM_RESIZES("resizes", false),
+  RESIZE_TIME_MS("resizeTimeMs", false),
+
   REQUEST_CONNECTION_TIMEOUTS("timeouts", false),
   HELIX_ZOOKEEPER_RECONNECTS("reconnects", true),
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index c680661..39cc24e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -64,6 +64,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   REFRESH_FAILURES("segments", false),
   UNTAR_FAILURES("segments", false),
   SEGMENT_DOWNLOAD_FAILURES("segments", false),
+  NUM_RESIZES("numResizes", false),
+  RESIZE_TIME_MS("resizeTimeMs", false),
 
   // Netty connection metrics
   NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index afb1852..9773e7e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -173,6 +173,10 @@ public class CommonConstants {
     public static final int DEFAULT_MAX_REDUCE_THREADS_PER_QUERY =
         Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 
2)); // Same logic as CombineOperatorUtils
 
+    // used for SQL GROUP BY during broker reduce
+    public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = 
"pinot.broker.groupby.trim.threshold";
+    public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+
     public static class Request {
       public static final String PQL = "pql";
       public static final String SQL = "sql";
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 527669c..181dc5a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -42,6 +42,8 @@ public interface DataTable {
   String TIME_USED_MS_METADATA_KEY = "timeUsedMs";
   String TRACE_INFO_METADATA_KEY = "traceInfo";
   String REQUEST_ID_METADATA_KEY = "requestId";
+  String NUM_RESIZES_METADATA_KEY = "numResizes";
+  String RESIZE_TIME_MS_METADATA_KEY = "resizeTimeMs";
 
   void addException(ProcessingException processingException);
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index 2ff4ccd..1bfe913 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -39,17 +39,16 @@ import org.slf4j.LoggerFactory;
 public class ConcurrentIndexedTable extends IndexedTable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConcurrentIndexedTable.class);
 
-  private final ConcurrentMap<Key, Record> _lookupMap;
-  private final ReentrantReadWriteLock _readWriteLock;
+  protected volatile ConcurrentMap<Key, Record> _lookupMap;
+  protected final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
   private Iterator<Record> _iterator;
-
-  private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
+  private final ReentrantReadWriteLock _readWriteLock;
   private final AtomicInteger _numResizes = new AtomicInteger();
-  private final AtomicLong _resizeTime = new AtomicLong();
-
-  public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext 
queryContext, int capacity) {
-    super(dataSchema, queryContext, capacity);
+  private final AtomicLong _resizeTimeMs = new AtomicLong();
 
+  public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext 
queryContext, int trimSize,
+      int trimThreshold) {
+    super(dataSchema, queryContext, trimSize, trimThreshold);
     _lookupMap = new ConcurrentHashMap<>();
     _readWriteLock = new ReentrantReadWriteLock();
   }
@@ -59,9 +58,7 @@ public class ConcurrentIndexedTable extends IndexedTable {
    */
   @Override
   public boolean upsert(Key key, Record newRecord) {
-
     Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
-
     if (_noMoreNewRecords.get()) { // allow only existing record updates
       _lookupMap.computeIfPresent(key, (k, v) -> {
         Object[] existingValues = v.getValues();
@@ -93,14 +90,14 @@ public class ConcurrentIndexedTable extends IndexedTable {
         _readWriteLock.readLock().unlock();
       }
 
-      // resize if exceeds max capacity
-      if (_lookupMap.size() >= _maxCapacity) {
+      // resize if exceeds trim threshold
+      if (_lookupMap.size() >= _trimThreshold) {
         if (_hasOrderBy) {
           // reached capacity, resize
           _readWriteLock.writeLock().lock();
           try {
-            if (_lookupMap.size() >= _maxCapacity) {
-              resize(_capacity);
+            if (_lookupMap.size() >= _trimThreshold) {
+              resize(_trimSize);
             }
           } finally {
             _readWriteLock.writeLock().unlock();
@@ -116,7 +113,7 @@ public class ConcurrentIndexedTable extends IndexedTable {
 
   @Override
   public int size() {
-    return _lookupMap.size();
+    return _sortedRecords == null ? _lookupMap.size() : _sortedRecords.size();
   }
 
   @Override
@@ -125,52 +122,55 @@ public class ConcurrentIndexedTable extends IndexedTable {
   }
 
   private void resize(int trimToSize) {
-
     long startTime = System.currentTimeMillis();
-
-    _tableResizer.resizeRecordsMap(_lookupMap, trimToSize);
-
+    // when the resizer trims using a PQ, it will return a new trimmed map.
+    // the reference held by the indexed table needs to be updated. this is 
also
+    // the reason why it is volatile since the thread doing the resize will 
result in
+    // a new reference
+    _lookupMap = (ConcurrentMap)_tableResizer.resizeRecordsMap(_lookupMap, 
trimToSize);
     long endTime = System.currentTimeMillis();
     long timeElapsed = endTime - startTime;
-
     _numResizes.incrementAndGet();
-    _resizeTime.addAndGet(timeElapsed);
+    _resizeTimeMs.addAndGet(timeElapsed);
   }
 
   private List<Record> resizeAndSort(int trimToSize) {
-
     long startTime = System.currentTimeMillis();
-
-    List<Record> sortedRecords = 
_tableResizer.resizeAndSortRecordsMap(_lookupMap, trimToSize);
-
+    List<Record> sortedRecords = _tableResizer.sortRecordsMap(_lookupMap, 
trimToSize);
     long endTime = System.currentTimeMillis();
     long timeElapsed = endTime - startTime;
-
     _numResizes.incrementAndGet();
-    _resizeTime.addAndGet(timeElapsed);
-
+    _resizeTimeMs.addAndGet(timeElapsed);
     return sortedRecords;
   }
 
   @Override
   public void finish(boolean sort) {
-
     if (_hasOrderBy) {
-
       if (sort) {
-        List<Record> sortedRecords = resizeAndSort(_capacity);
-        _iterator = sortedRecords.iterator();
+        _sortedRecords = resizeAndSort(_trimSize);
+        _iterator = _sortedRecords.iterator();
       } else {
-        resize(_capacity);
+        resize(_trimSize);
       }
       int numResizes = _numResizes.get();
-      long resizeTime = _resizeTime.get();
-      LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg 
resize time : {}", numResizes, resizeTime,
-          numResizes == 0 ? 0 : resizeTime / numResizes);
+      long resizeTime = _resizeTimeMs.get();
+      LOGGER.debug(
+          "Num resizes : {}, Total time spent in resizing : {}, Avg resize 
time : {}, trimSize: {}, trimThreshold: {}",
+          numResizes, resizeTime, numResizes == 0 ? 0 : resizeTime / 
numResizes, _trimSize, _trimThreshold);
     }
-
     if (_iterator == null) {
       _iterator = _lookupMap.values().iterator();
     }
   }
+
+  @Override
+  public int getNumResizes() {
+    return _numResizes.get();
+  }
+
+  @Override
+  public long getResizeTimeMs() {
+    return _resizeTimeMs.get();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index 5867256..04feade 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -36,40 +36,43 @@ public abstract class IndexedTable extends BaseTable {
   protected final AggregationFunction[] _aggregationFunctions;
   protected final boolean _hasOrderBy;
   protected final TableResizer _tableResizer;
+  protected List<Record> _sortedRecords;
+  // The size we need to trim to
+  protected final int _trimSize;
+  // The size with added buffer, in order to collect more records than 
capacity for better precision
+  protected final int _trimThreshold;
 
-  // The capacity we need to trim to
-  protected final int _capacity;
-  // The capacity with added buffer, in order to collect more records than 
capacity for better precision
-  protected final int _maxCapacity;
-
-  protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int 
capacity) {
+  protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int 
trimSize, int trimThreshold) {
     super(dataSchema);
-
     List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
     assert groupByExpressions != null;
     _numKeyColumns = groupByExpressions.size();
-
     _aggregationFunctions = queryContext.getAggregationFunctions();
-
     List<OrderByExpressionContext> orderByExpressions = 
queryContext.getOrderByExpressions();
     if (orderByExpressions != null) {
+      // SQL GROUP BY with ORDER BY
+      // trimSize = max (limit N * 5, 5000) (see 
GroupByUtils.getTableCapacity).
+      // trimSize is also bound by trimThreshold/2 to protect the server in 
case
+      // when user specifies a very high value of LIMIT N.
+      // trimThreshold is configurable. to keep parity with PQL for some use
+      // cases with infinitely large group by, trimThreshold will be >= 1B
+      // (exactly same as PQL). This essentially implies there will be no
+      // resizing/trimming during upsert and exactly one trim during finish.
       _hasOrderBy = true;
       _tableResizer = new TableResizer(dataSchema, queryContext);
-      _capacity = capacity;
-
-      // TODO: tune these numbers and come up with a better formula (github 
ISSUE-4801)
-      // Based on the capacity and maxCapacity, the resizer will smartly 
choose to evict/retain recors from the PQ
-      if (capacity
-          <= 100_000) { // Capacity is small, make a very large buffer. Make 
PQ of records to retain, during resize
-        _maxCapacity = 1_000_000;
-      } else { // Capacity is large, make buffer only slightly bigger. Make PQ 
of records to evict, during resize
-        _maxCapacity = (int) (capacity * 1.2);
-      }
+      _trimSize = Math.min(trimSize, trimThreshold / 2);
+      _trimThreshold = trimThreshold;
     } else {
+      // SQL GROUP BY without ORDER BY
+      // trimSize = LIMIT N (see GroupByUtils.getTableCapacity)
+      // trimThreshold is same as trimSize since indexed table stops
+      // accepting records once map size reaches trimSize. there is no
+      // resize/trim during upsert since the results can be arbitrary
+      // and are truncated once they reach trimSize
       _hasOrderBy = false;
       _tableResizer = null;
-      _capacity = capacity;
-      _maxCapacity = capacity;
+      _trimSize = trimSize;
+      _trimThreshold = trimSize;
     }
   }
 
@@ -80,4 +83,8 @@ public abstract class IndexedTable extends BaseTable {
     Object[] keyValues = Arrays.copyOf(record.getValues(), _numKeyColumns);
     return upsert(new Key(keyValues), record);
   }
+
+  public abstract int getNumResizes();
+
+  public abstract long getResizeTimeMs();
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index 05b5359..a7e133a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -37,16 +37,15 @@ import org.slf4j.LoggerFactory;
 public class SimpleIndexedTable extends IndexedTable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleIndexedTable.class);
 
-  private final Map<Key, Record> _lookupMap;
+  private Map<Key, Record> _lookupMap;
   private Iterator<Record> _iterator;
 
   private boolean _noMoreNewRecords = false;
   private int _numResizes = 0;
-  private long _resizeTime = 0;
-
-  public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext, 
int capacity) {
-    super(dataSchema, queryContext, capacity);
+  private long _resizeTimeMs = 0;
 
+  public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext, 
int trimSize, int trimThreshold) {
+    super(dataSchema, queryContext, trimSize, trimThreshold);
     _lookupMap = new HashMap<>();
   }
 
@@ -56,7 +55,6 @@ public class SimpleIndexedTable extends IndexedTable {
   @Override
   public boolean upsert(Key key, Record newRecord) {
     Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
-
     if (_noMoreNewRecords) { // allow only existing record updates
       _lookupMap.computeIfPresent(key, (k, v) -> {
         Object[] existingValues = v.getValues();
@@ -83,10 +81,10 @@ public class SimpleIndexedTable extends IndexedTable {
         }
       });
 
-      if (_lookupMap.size() >= _maxCapacity) {
+      if (_lookupMap.size() >= _trimThreshold) {
         if (_hasOrderBy) {
           // reached max capacity, resize
-          resize(_capacity);
+          resize(_trimSize);
         } else {
           // reached max capacity and no order by. No more new records will be 
accepted
           _noMoreNewRecords = true;
@@ -97,36 +95,27 @@ public class SimpleIndexedTable extends IndexedTable {
   }
 
   private void resize(int trimToSize) {
-
     long startTime = System.currentTimeMillis();
-
-    _tableResizer.resizeRecordsMap(_lookupMap, trimToSize);
-
+    _lookupMap = _tableResizer.resizeRecordsMap(_lookupMap, trimToSize);
     long endTime = System.currentTimeMillis();
     long timeElapsed = endTime - startTime;
-
     _numResizes++;
-    _resizeTime += timeElapsed;
+    _resizeTimeMs += timeElapsed;
   }
 
   private List<Record> resizeAndSort(int trimToSize) {
-
     long startTime = System.currentTimeMillis();
-
-    List<Record> sortedRecords = 
_tableResizer.resizeAndSortRecordsMap(_lookupMap, trimToSize);
-
+    List<Record> sortedRecords = _tableResizer.sortRecordsMap(_lookupMap, 
trimToSize);
     long endTime = System.currentTimeMillis();
     long timeElapsed = endTime - startTime;
-
     _numResizes++;
-    _resizeTime += timeElapsed;
-
+    _resizeTimeMs += timeElapsed;
     return sortedRecords;
   }
 
   @Override
   public int size() {
-    return _lookupMap.size();
+    return _sortedRecords == null ? _lookupMap.size() : _sortedRecords.size();
   }
 
   @Override
@@ -136,22 +125,28 @@ public class SimpleIndexedTable extends IndexedTable {
 
   @Override
   public void finish(boolean sort) {
-
     if (_hasOrderBy) {
-
       if (sort) {
-        List<Record> sortedRecords = resizeAndSort(_capacity);
-        _iterator = sortedRecords.iterator();
+        _sortedRecords = resizeAndSort(_trimSize);
+        _iterator = _sortedRecords.iterator();
       } else {
-        resize(_capacity);
+        resize(_trimSize);
       }
-      LOGGER
-          .debug("Num resizes : {}, Total time spent in resizing : {}, Avg 
resize time : {}", _numResizes, _resizeTime,
-              _numResizes == 0 ? 0 : _resizeTime / _numResizes);
+      LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg 
resize time : {}, trimSize: {}, trimThreshold: {}",
+          _numResizes, _resizeTimeMs, _numResizes == 0 ? 0 : _resizeTimeMs / 
_numResizes, _trimSize, _trimThreshold);
     }
-
     if (_iterator == null) {
       _iterator = _lookupMap.values().iterator();
     }
   }
+
+  @Override
+  public int getNumResizes() {
+    return _numResizes;
+  }
+
+  @Override
+  public long getResizeTimeMs() {
+    return _resizeTimeMs;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
index 256f8bd..2b1470f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -136,39 +138,48 @@ public class TableResizer {
    * Resize only if number of records is greater than trimToSize
    * The resizer smartly chooses to create PQ of records to evict or records 
to retain, based on the number of records and the number of records to evict
    */
-  public void resizeRecordsMap(Map<Key, Record> recordsMap, int trimToSize) {
+  public Map<Key, Record> resizeRecordsMap(Map<Key, Record> recordsMap, int 
trimToSize) {
     int numRecordsToEvict = recordsMap.size() - trimToSize;
-
     if (numRecordsToEvict > 0) {
       // TODO: compare the performance of converting to IntermediateRecord vs 
keeping Record, in cases where we do not need to extract final results
-
-      if (numRecordsToEvict < trimToSize) { // num records to evict is smaller 
than num records to retain
+      if (numRecordsToEvict < trimToSize) {
+        // num records to evict is smaller than num records to retain
         // make PQ of records to evict
         PriorityQueue<IntermediateRecord> priorityQueue =
             convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, 
_intermediateRecordComparator);
         for (IntermediateRecord evictRecord : priorityQueue) {
           recordsMap.remove(evictRecord._key);
         }
-      } else { // num records to retain is smaller than num records to evict
+        return recordsMap;
+      } else {
+        // num records to retain is smaller than num records to evict
         // make PQ of records to retain
+        // TODO - Consider reusing the same map by removing record from the map
+        // at the time it is evicted from PQ
+        Map<Key, Record> trimmedRecordsMap;
+        if (recordsMap instanceof ConcurrentMap) {
+          // invoked by ConcurrentIndexedTable
+          trimmedRecordsMap = new ConcurrentHashMap<>();
+        } else {
+          // invoked by SimpleIndexedTable
+          trimmedRecordsMap = new HashMap<>();
+        }
         Comparator<IntermediateRecord> comparator = 
_intermediateRecordComparator.reversed();
         PriorityQueue<IntermediateRecord> priorityQueue =
             convertToIntermediateRecordsPQ(recordsMap, trimToSize, comparator);
-        ObjectOpenHashSet<Key> keysToRetain = new 
ObjectOpenHashSet<>(priorityQueue.size());
-        for (IntermediateRecord retainRecord : priorityQueue) {
-          keysToRetain.add(retainRecord._key);
+        for (IntermediateRecord recordToRetain : priorityQueue) {
+          trimmedRecordsMap.put(recordToRetain._key, 
recordsMap.get(recordToRetain._key));
         }
-        recordsMap.keySet().retainAll(keysToRetain);
+        return trimmedRecordsMap;
       }
     }
+    return recordsMap;
   }
 
   private PriorityQueue<IntermediateRecord> 
convertToIntermediateRecordsPQ(Map<Key, Record> recordsMap, int size,
       Comparator<IntermediateRecord> comparator) {
     PriorityQueue<IntermediateRecord> priorityQueue = new 
PriorityQueue<>(size, comparator);
-
     for (Map.Entry<Key, Record> entry : recordsMap.entrySet()) {
-
       IntermediateRecord intermediateRecord = 
getIntermediateRecord(entry.getKey(), entry.getValue());
       if (priorityQueue.size() < size) {
         priorityQueue.offer(intermediateRecord);
@@ -183,62 +194,25 @@ public class TableResizer {
     return priorityQueue;
   }
 
-  private List<Record> sortRecordsMap(Map<Key, Record> recordsMap) {
-    int numRecords = recordsMap.size();
-    List<Record> sortedRecords = new ArrayList<>(numRecords);
-    List<IntermediateRecord> intermediateRecords = new ArrayList<>(numRecords);
-    for (Map.Entry<Key, Record> entry : recordsMap.entrySet()) {
-      intermediateRecords.add(getIntermediateRecord(entry.getKey(), 
entry.getValue()));
-    }
-    intermediateRecords.sort(_intermediateRecordComparator);
-    for (IntermediateRecord intermediateRecord : intermediateRecords) {
-      sortedRecords.add(recordsMap.get(intermediateRecord._key));
-    }
-    return sortedRecords;
-  }
-
   /**
-   * Resizes the recordsMap and returns a sorted list of records.
+   * Sorts the recordsMap using a priority queue and returns a sorted list of 
records
    * This method is to be called from IndexedTable::finish, if both resize and 
sort is needed
-   *
-   * If numRecordsToEvict > numRecordsToRetain, resize with PQ of records to 
evict, and then sort
-   * Else, resize with PQ of record to retain, then use the PQ to create 
sorted list
    */
-  public List<Record> resizeAndSortRecordsMap(Map<Key, Record> recordsMap, int 
trimToSize) {
+  public List<Record> sortRecordsMap(Map<Key, Record> recordsMap, int 
trimToSize) {
     int numRecords = recordsMap.size();
     if (numRecords == 0) {
       return Collections.emptyList();
     }
-
     int numRecordsToRetain = Math.min(numRecords, trimToSize);
-    int numRecordsToEvict = numRecords - numRecordsToRetain;
-
-    if (numRecordsToEvict < numRecordsToRetain) { // num records to evict is 
smaller than num records to retain
-      if (numRecordsToEvict > 0) {
-        // make PQ of records to evict
-        PriorityQueue<IntermediateRecord> priorityQueue =
-            convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, 
_intermediateRecordComparator);
-        for (IntermediateRecord evictRecord : priorityQueue) {
-          recordsMap.remove(evictRecord._key);
-        }
-      }
-      return sortRecordsMap(recordsMap);
-    } else {
-      // make PQ of records to retain
-      PriorityQueue<IntermediateRecord> priorityQueue =
-          convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, 
_intermediateRecordComparator.reversed());
-      // use PQ to get sorted list
-      Record[] sortedArray = new Record[numRecordsToRetain];
-      ObjectOpenHashSet<Key> keysToRetain = new 
ObjectOpenHashSet<>(numRecordsToRetain);
-      while (!priorityQueue.isEmpty()) {
-        IntermediateRecord intermediateRecord = priorityQueue.poll();
-        keysToRetain.add(intermediateRecord._key);
-        Record record = recordsMap.get(intermediateRecord._key);
-        sortedArray[--numRecordsToRetain] = record;
-      }
-      recordsMap.keySet().retainAll(keysToRetain);
-      return Arrays.asList(sortedArray);
+    // make PQ of sorted records to retain
+    PriorityQueue<IntermediateRecord> priorityQueue = 
convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, 
_intermediateRecordComparator.reversed());
+    Record[] sortedArray = new Record[numRecordsToRetain];
+    while (!priorityQueue.isEmpty()) {
+      IntermediateRecord intermediateRecord = priorityQueue.poll();
+      Record record = recordsMap.get(intermediateRecord._key);
+      sortedArray[--numRecordsToRetain] = record;
     }
+    return Arrays.asList(sortedArray);
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
new file mode 100644
index 0000000..18d30cf
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Another version of {@link ConcurrentIndexedTable} used for use cases
+ * configured to have infinite group by (num group limit to se 1B or higher).
+ * For such cases, there won't be any resizing/trimming during upsert since
+ * trimThreshold is very high. Thus, we can avoid the overhead of readLock's
+ * lock and unlock operations which are otherwise used in 
ConcurrentIndexedTable
+ * to prevent race conditions between 2 threads doing upsert and one of them 
ends
+ * up trimming from upsert. For use cases with very large number of groups, we 
had
+ * noticed that load-unlock overhead was > 1sec and this specialized concurrent
+ * indexed table avoids that by overriding just the upsert method
+ */
+public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {
+
+  public UnboundedConcurrentIndexedTable(DataSchema dataSchema,
+      QueryContext queryContext, int trimSize, int trimThreshold) {
+    super(dataSchema, queryContext, trimSize, trimThreshold);
+  }
+
+  @Override
+  public boolean upsert(Key key, Record newRecord) {
+    if (_noMoreNewRecords.get()) {
+      // allow only existing record updates
+      _lookupMap.computeIfPresent(key, (k, v) -> {
+        Object[] existingValues = v.getValues();
+        Object[] newValues = newRecord.getValues();
+        int aggNum = 0;
+        for (int i = _numKeyColumns; i < _numColumns; i++) {
+          existingValues[i] = 
_aggregationFunctions[aggNum++].merge(existingValues[i], newValues[i]);
+        }
+        return v;
+      });
+    } else {
+      // allow all records
+      _lookupMap.compute(key, (k, v) -> {
+        if (v == null) {
+          return newRecord;
+        } else {
+          Object[] existingValues = v.getValues();
+          Object[] newValues = newRecord.getValues();
+          int aggNum = 0;
+          for (int i = _numKeyColumns; i < _numColumns; i++) {
+            existingValues[i] = 
_aggregationFunctions[aggNum++].merge(existingValues[i], newValues[i]);
+          }
+          return v;
+        }
+      });
+
+      if (_lookupMap.size() >= _trimSize && !_hasOrderBy) {
+        // reached capacity and no order by. No more new records will be 
accepted
+        _noMoreNewRecords.set(true);
+      }
+    }
+    return true;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index 79ac520..09784fa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -40,10 +40,13 @@ import 
org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableImplV2;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.Table;
+import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -64,6 +67,8 @@ public class IntermediateResultsBlock implements Block {
   private int _numSegmentsProcessed;
   private int _numSegmentsMatched;
   private boolean _numGroupsLimitReached;
+  private int _numResizes;
+  private long _resizeTimeMs;
 
   private Table _table;
 
@@ -218,6 +223,14 @@ public class IntermediateResultsBlock implements Block {
     _numGroupsLimitReached = numGroupsLimitReached;
   }
 
+  public void setNumResizes(int numResizes) {
+    _numResizes = numResizes;
+  }
+
+  public void setResizeTimeMs(long resizeTimeMs) {
+    _resizeTimeMs = resizeTimeMs;
+  }
+
   @VisibleForTesting
   public long getNumDocsScanned() {
     return _numDocsScanned;
@@ -278,17 +291,16 @@ public class IntermediateResultsBlock implements Block {
 
   private DataTable getResultDataTable()
       throws IOException {
-
     DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
 
     Iterator<Record> iterator = _table.iterator();
+    ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
     while (iterator.hasNext()) {
       Record record = iterator.next();
       dataTableBuilder.startRow();
       int columnIndex = 0;
       for (Object value : record.getValues()) {
-        ColumnDataType columnDataType = 
_dataSchema.getColumnDataType(columnIndex);
-        setDataTableColumn(columnDataType, dataTableBuilder, columnIndex, 
value);
+        setDataTableColumn(columnDataTypes[columnIndex], dataTableBuilder, 
columnIndex, value);
         columnIndex++;
       }
       dataTableBuilder.finishRow();
@@ -416,6 +428,8 @@ public class IntermediateResultsBlock implements Block {
         .put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, 
String.valueOf(_numEntriesScannedPostFilter));
     dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_PROCESSED, 
String.valueOf(_numSegmentsProcessed));
     dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_MATCHED, 
String.valueOf(_numSegmentsMatched));
+    dataTable.getMetadata().put(DataTable.NUM_RESIZES_METADATA_KEY, 
String.valueOf(_numResizes));
+    dataTable.getMetadata().put(DataTable.RESIZE_TIME_MS_METADATA_KEY, 
String.valueOf(_resizeTimeMs));
 
     dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, 
String.valueOf(_numTotalDocs));
     if (_numGroupsLimitReached) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 37af764..0d5ce5b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -62,24 +63,27 @@ import org.slf4j.LoggerFactory;
 public class GroupByOrderByCombineOperator extends 
BaseOperator<IntermediateResultsBlock> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GroupByOrderByCombineOperator.class);
   private static final String OPERATOR_NAME = "GroupByOrderByCombineOperator";
+  public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
 
   private final List<Operator> _operators;
   private final QueryContext _queryContext;
   private final ExecutorService _executorService;
   private final long _endTimeMs;
-  private final int _indexedTableCapacity;
+  private final int _trimSize;
+  private final int _trimThreshold;
   private final Lock _initLock;
   private DataSchema _dataSchema;
   private ConcurrentIndexedTable _indexedTable;
 
   public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext 
queryContext,
-      ExecutorService executorService, long endTimeMs) {
+      ExecutorService executorService, long endTimeMs, int trimThreshold) {
     _operators = operators;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
     _initLock = new ReentrantLock();
-    _indexedTableCapacity = GroupByUtils.getTableCapacity(_queryContext);
+    _trimSize = GroupByUtils.getTableCapacity(_queryContext);
+    _trimThreshold = trimThreshold;
   }
 
   /**
@@ -140,7 +144,16 @@ public class GroupByOrderByCombineOperator extends 
BaseOperator<IntermediateResu
             try {
               if (_dataSchema == null) {
                 _dataSchema = intermediateResultsBlock.getDataSchema();
-                _indexedTable = new ConcurrentIndexedTable(_dataSchema, 
_queryContext, _indexedTableCapacity);
+                if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+                  // special case of trim threshold where it is set to max 
value.
+                  // there won't be any trimming during upsert in this case.
+                  // thus we can avoid the overhead of read-lock and write-lock
+                  // in the upsert method.
+                  _indexedTable = new 
UnboundedConcurrentIndexedTable(_dataSchema, _queryContext,
+                      _trimSize, _trimThreshold);
+                } else {
+                  _indexedTable = new ConcurrentIndexedTable(_dataSchema, 
_queryContext, _trimSize, _trimThreshold);
+                }
               }
             } finally {
               _initLock.unlock();
@@ -241,10 +254,10 @@ public class GroupByOrderByCombineOperator extends 
BaseOperator<IntermediateResu
 
       // Set the execution statistics.
       CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators);
+      mergedBlock.setNumResizes(_indexedTable.getNumResizes());
+      mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs());
 
-      if (_indexedTable.size() >= _indexedTableCapacity) {
-        mergedBlock.setNumGroupsLimitReached(true);
-      }
+      // TODO - set numGroupsLimitReached
 
       return mergedBlock;
     } catch (Exception e) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 0bce7c0..fb634c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -61,6 +61,8 @@ public class CombinePlanNode implements PlanNode {
   private final long _endTimeMs;
   private final int _numGroupsLimit;
   private final StreamObserver<Server.ServerResponse> _streamObserver;
+  // used for SQL GROUP BY during server combine
+  private final int _groupByTrimThreshold;
 
   /**
    * Constructor for the class.
@@ -71,15 +73,18 @@ public class CombinePlanNode implements PlanNode {
    * @param endTimeMs End time in milliseconds for the query
    * @param numGroupsLimit Limit of number of groups stored in each segment
    * @param streamObserver Optional stream observer for streaming query
+   * @param groupByTrimThreshold trim threshold to use for server combine for 
SQL GROUP BY
    */
   public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext, 
ExecutorService executorService,
-      long endTimeMs, int numGroupsLimit, @Nullable 
StreamObserver<Server.ServerResponse> streamObserver) {
+      long endTimeMs, int numGroupsLimit, @Nullable 
StreamObserver<Server.ServerResponse> streamObserver,
+      int groupByTrimThreshold) {
     _planNodes = planNodes;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
     _numGroupsLimit = numGroupsLimit;
     _streamObserver = streamObserver;
+    _groupByTrimThreshold = groupByTrimThreshold;
   }
 
   @SuppressWarnings({"rawtypes", "unchecked"})
@@ -175,7 +180,8 @@ public class CombinePlanNode implements PlanNode {
         // Aggregation group-by
         QueryOptions queryOptions = new 
QueryOptions(_queryContext.getQueryOptions());
         if (queryOptions.isGroupByModeSQL()) {
-          return new GroupByOrderByCombineOperator(operators, _queryContext, 
_executorService, _endTimeMs);
+          return new GroupByOrderByCombineOperator(operators, _queryContext, 
_executorService, _endTimeMs,
+              _groupByTrimThreshold);
         }
         return new GroupByCombineOperator(operators, _queryContext, 
_executorService, _endTimeMs, _numGroupsLimit);
       }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 7d77549..1794015 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -62,20 +62,28 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
 
+  // set as pinot.server.query.executor.groupby.trim.threshold
+  public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold";
+  public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+
   private final int _maxInitialResultHolderCapacity;
   // Limit on number of groups stored for each segment, beyond which no new 
group will be created
   private final int _numGroupsLimit;
+  // Used for SQL GROUP BY (server combine)
+  private final int _groupByTrimThreshold;
 
   @VisibleForTesting
   public InstancePlanMakerImplV2() {
     _maxInitialResultHolderCapacity = 
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
     _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
+    _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
   }
 
   @VisibleForTesting
   public InstancePlanMakerImplV2(int maxInitialResultHolderCapacity, int 
numGroupsLimit) {
     _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
     _numGroupsLimit = numGroupsLimit;
+    _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
   }
 
   /**
@@ -91,6 +99,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     _maxInitialResultHolderCapacity = queryExecutorConfig.getConfig()
         .getProperty(MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY, 
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
     _numGroupsLimit = 
queryExecutorConfig.getConfig().getProperty(NUM_GROUPS_LIMIT, 
DEFAULT_NUM_GROUPS_LIMIT);
+    _groupByTrimThreshold =
+        queryExecutorConfig.getConfig().getProperty(GROUPBY_TRIM_THRESHOLD, 
DEFAULT_GROUPBY_TRIM_THRESHOLD);
     Preconditions.checkState(_maxInitialResultHolderCapacity <= 
_numGroupsLimit,
         "Invalid configuration: maxInitialResultHolderCapacity: %d must be 
smaller or equal to numGroupsLimit: %d",
         _maxInitialResultHolderCapacity, _numGroupsLimit);
@@ -106,7 +116,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
       planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
     }
     CombinePlanNode combinePlanNode =
-        new CombinePlanNode(planNodes, queryContext, executorService, 
endTimeMs, _numGroupsLimit, null);
+        new CombinePlanNode(planNodes, queryContext, executorService, 
endTimeMs, _numGroupsLimit, null,
+            _groupByTrimThreshold);
     return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
   }
 
@@ -153,7 +164,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
       planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext));
     }
     CombinePlanNode combinePlanNode =
-        new CombinePlanNode(planNodes, queryContext, executorService, 
endTimeMs, _numGroupsLimit, streamObserver);
+        new CombinePlanNode(planNodes, queryContext, executorService, 
endTimeMs, _numGroupsLimit, streamObserver,
+            _groupByTrimThreshold);
     return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 0ff1098..7d61015 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -66,10 +66,13 @@ public class BrokerReduceService {
 
   private final ExecutorService _reduceExecutorService;
   private final int _maxReduceThreadsPerQuery;
+  private final int _groupByTrimThreshold;
 
   public BrokerReduceService(PinotConfiguration config) {
     _maxReduceThreadsPerQuery = 
config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
         CommonConstants.Broker.DEFAULT_MAX_REDUCE_THREADS_PER_QUERY);
+    _groupByTrimThreshold = 
config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD,
+        CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
 
     int numThreadsInExecutorService = 
Runtime.getRuntime().availableProcessors();
     LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max 
reduce threads.",
@@ -225,8 +228,9 @@ public class BrokerReduceService {
 
     QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
     DataTableReducer dataTableReducer = 
ResultReducerFactory.getResultReducer(queryContext);
-    dataTableReducer.reduceAndSetResults(tableName, cachedDataSchema, 
dataTableMap, brokerResponseNative,
-        new DataTableReducerContext(_reduceExecutorService, 
_maxReduceThreadsPerQuery, reduceTimeOutMs), brokerMetrics);
+    dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, 
dataTableMap, brokerResponseNative,
+        new DataTableReducerContext(_reduceExecutorService, 
_maxReduceThreadsPerQuery, reduceTimeOutMs,
+            _groupByTrimThreshold), brokerMetrics);
     updateAlias(queryContext, brokerResponseNative);
     return brokerResponseNative;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
index 1e29378..d4946df 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducerContext.java
@@ -29,6 +29,8 @@ public class DataTableReducerContext {
   private final ExecutorService _executorService;
   private final int _maxReduceThreadsPerQuery;
   private final long _reduceTimeOutMs;
+  // used for SQL GROUP BY
+  private final int _groupByTrimThreshold;
 
   /**
    * Constructor for the class.
@@ -36,11 +38,14 @@ public class DataTableReducerContext {
    * @param executorService Executor service to use for DataTableReducer
    * @param maxReduceThreadsPerQuery Max number of threads to use for reduce 
phase
    * @param reduceTimeOutMs Reduce Phase timeOut in ms
+   * @param groupByTrimThreshold trim threshold for SQL group by
    */
-  public DataTableReducerContext(ExecutorService executorService, int 
maxReduceThreadsPerQuery, long reduceTimeOutMs) {
+  public DataTableReducerContext(ExecutorService executorService, int 
maxReduceThreadsPerQuery, long reduceTimeOutMs,
+      int groupByTrimThreshold) {
     _executorService = executorService;
     _maxReduceThreadsPerQuery = maxReduceThreadsPerQuery;
     _reduceTimeOutMs = reduceTimeOutMs;
+    _groupByTrimThreshold = groupByTrimThreshold;
   }
 
   public ExecutorService getExecutorService() {
@@ -54,4 +59,8 @@ public class DataTableReducerContext {
   public long getReduceTimeOutMs() {
     return _reduceTimeOutMs;
   }
+
+  public int getGroupByTrimThreshold() {
+    return _groupByTrimThreshold;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 7cc423a..2ec1161 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -45,6 +45,8 @@ import 
org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
@@ -124,7 +126,8 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
         // This is the primary SQL compliant group by
 
         try {
-          setSQLGroupByInResultTable(brokerResponseNative, dataSchema, 
dataTables, reducerContext);
+          setSQLGroupByInResultTable(brokerResponseNative, dataSchema, 
dataTables, reducerContext, tableName,
+              brokerMetrics);
         } catch (TimeoutException e) {
           brokerResponseNative.getProcessingExceptions()
               .add(new 
QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, 
e.getMessage()));
@@ -181,12 +184,19 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
    * @param dataSchema data schema
    * @param dataTables Collection of data tables
    * @param reducerContext DataTableReducer context
+   * @param rawTableName table name
+   * @param brokerMetrics broker metrics (meters)
    * @throws TimeoutException If unable complete within timeout.
    */
   private void setSQLGroupByInResultTable(BrokerResponseNative 
brokerResponseNative, DataSchema dataSchema,
-      Collection<DataTable> dataTables, DataTableReducerContext reducerContext)
+      Collection<DataTable> dataTables, DataTableReducerContext 
reducerContext, String rawTableName,
+      BrokerMetrics brokerMetrics)
       throws TimeoutException {
     IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, 
reducerContext);
+    if (brokerMetrics != null) {
+      brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+      brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+    }
     Iterator<Record> sortedIterator = indexedTable.iterator();
     DataSchema prePostAggregationDataSchema = 
getPrePostAggregationDataSchema(dataSchema);
     int limit = _queryContext.getLimit();
@@ -262,13 +272,24 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
     int numDataTables = dataTablesToReduce.size();
 
     // Get the number of threads to use for reducing.
-    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
-
     // In case of single reduce thread, fall back to SimpleIndexedTable to 
avoid redundant locking/unlocking calls.
-    int capacity = GroupByUtils.getTableCapacity(_queryContext);
-    IndexedTable indexedTable =
-        (numReduceThreadsToUse > 1) ? new ConcurrentIndexedTable(dataSchema, 
_queryContext, capacity)
-            : new SimpleIndexedTable(dataSchema, _queryContext, capacity);
+    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
+    int trimSize = GroupByUtils.getTableCapacity(_queryContext);
+    int trimThreshold = reducerContext.getGroupByTrimThreshold();
+    IndexedTable indexedTable;
+    if (numReduceThreadsToUse <= 1) {
+      indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, 
trimSize, trimThreshold);
+    } else {
+      if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
+        // special case of trim threshold where it is set to max value.
+        // there won't be any trimming during upsert in this case.
+        // thus we can avoid the overhead of read-lock and write-lock
+        // in the upsert method.
+        indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, 
_queryContext, trimSize, trimThreshold);
+      } else {
+        indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, 
trimSize, trimThreshold);
+      }
+    }
 
     Future[] futures = new Future[numDataTables];
     CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 19afc4f..69c43de 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -60,6 +60,8 @@ public abstract class QueryScheduler {
   private static final String INVALID_NUM_SCANNED = "-1";
   private static final String INVALID_SEGMENTS_COUNT = "-1";
   private static final String INVALID_FRESHNESS_MS = "-1";
+  private static final String INVALID_NUM_RESIZES = "-1";
+  private static final String INVALID_RESIZE_TIME_MS = "-1";
   private static final String QUERY_LOG_MAX_RATE_KEY = 
"query.log.maxRatePerSecond";
   private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
 
@@ -183,6 +185,8 @@ public abstract class QueryScheduler {
         
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED,
 INVALID_SEGMENTS_COUNT));
     long minConsumingFreshnessMs =
         
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS,
 INVALID_FRESHNESS_MS));
+    int numResizes = 
Integer.parseInt(dataTableMetadata.getOrDefault(DataTable.NUM_RESIZES_METADATA_KEY,
 INVALID_NUM_RESIZES));
+    long resizeTimeMs = 
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.RESIZE_TIME_MS_METADATA_KEY,
 INVALID_RESIZE_TIME_MS));
 
     if (numDocsScanned > 0) {
       serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
@@ -195,6 +199,12 @@ public abstract class QueryScheduler {
       serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER,
           numEntriesScannedPostFilter);
     }
+    if (numResizes > 0) {
+      serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_RESIZES, numResizes);
+    }
+    if (resizeTimeMs > 0) {
+      serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.RESIZE_TIME_MS, resizeTimeMs);
+    }
 
     TimerContext timerContext = queryRequest.getTimerContext();
     int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
index 166af6d..73fcd87 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
@@ -44,6 +44,8 @@ import org.testng.annotations.Test;
 @SuppressWarnings({"rawtypes"})
 public class IndexedTableTest {
 
+  private static final int TRIM_THRESHOLD = 20;
+
   @Test
   public void testConcurrentIndexedTable()
       throws InterruptedException, TimeoutException, ExecutionException {
@@ -51,7 +53,7 @@ public class IndexedTableTest {
         .getQueryContextFromSQL("SELECT SUM(m1), MAX(m2) FROM testTable GROUP 
BY d1, d2, d3 ORDER BY SUM(m1)");
     DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", 
"sum(m1)", "max(m2)"},
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
-    IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, 
queryContext, 5);
+    IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, 
queryContext, 5, TRIM_THRESHOLD);
 
     // 3 threads upsert together
     // a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times 
(20)
@@ -64,8 +66,7 @@ public class IndexedTableTest {
       Callable<Void> c1 = () -> {
         indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}), getRecord(new 
Object[]{"a", 1, 10d, 10d, 100d}));
         indexedTable.upsert(getKey(new Object[]{"b", 2, 20d}), getRecord(new 
Object[]{"b", 2, 20d, 10d, 200d}));
-        indexedTable.upsert(getKey(new Object[]{"c", 3, 30d}),
-            getRecord(new Object[]{"c", 3, 30d, 10000d, 300d})); // eviction 
candidate
+        indexedTable.upsert(getKey(new Object[]{"c", 3, 30d}), getRecord(new 
Object[]{"c", 3, 30d, 10000d, 300d})); // eviction candidate
         indexedTable.upsert(getKey(new Object[]{"d", 4, 40d}), getRecord(new 
Object[]{"d", 4, 40d, 10d, 400d}));
         indexedTable.upsert(getKey(new Object[]{"d", 4, 40d}), getRecord(new 
Object[]{"d", 4, 40d, 10d, 400d}));
         indexedTable.upsert(getKey(new Object[]{"e", 5, 50d}), getRecord(new 
Object[]{"e", 5, 50d, 10d, 500d}));
@@ -74,8 +75,7 @@ public class IndexedTableTest {
 
       Callable<Void> c2 = () -> {
         indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}), getRecord(new 
Object[]{"a", 1, 10d, 10d, 100d}));
-        indexedTable.upsert(getKey(new Object[]{"f", 6, 60d}),
-            getRecord(new Object[]{"f", 6, 60d, 20000d, 600d})); // eviction 
candidate
+        indexedTable.upsert(getKey(new Object[]{"f", 6, 60d}), getRecord(new 
Object[]{"f", 6, 60d, 20000d, 600d})); // eviction candidate
         indexedTable.upsert(getKey(new Object[]{"g", 7, 70d}), getRecord(new 
Object[]{"g", 7, 70d, 10d, 700d}));
         indexedTable.upsert(getKey(new Object[]{"b", 2, 20d}), getRecord(new 
Object[]{"b", 2, 20d, 10d, 200d}));
         indexedTable.upsert(getKey(new Object[]{"b", 2, 20d}), getRecord(new 
Object[]{"b", 2, 20d, 10d, 200d}));
@@ -92,8 +92,7 @@ public class IndexedTableTest {
         indexedTable.upsert(getKey(new Object[]{"k", 11, 110d}), getRecord(new 
Object[]{"k", 11, 110d, 10d, 1100d}));
         indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}), getRecord(new 
Object[]{"a", 1, 10d, 10d, 100d}));
         indexedTable.upsert(getKey(new Object[]{"l", 12, 120d}), getRecord(new 
Object[]{"l", 12, 120d, 10d, 1200d}));
-        indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}),
-            getRecord(new Object[]{"a", 1, 10d, 10d, 100d})); // trimming 
candidate
+        indexedTable.upsert(getKey(new Object[]{"a", 1, 10d}), getRecord(new 
Object[]{"a", 1, 10d, 10d, 100d})); // trimming candidate
         indexedTable.upsert(getKey(new Object[]{"b", 2, 20d}), getRecord(new 
Object[]{"b", 2, 20d, 10d, 200d}));
         indexedTable.upsert(getKey(new Object[]{"m", 13, 130d}), getRecord(new 
Object[]{"m", 13, 130d, 10d, 1300d}));
         indexedTable.upsert(getKey(new Object[]{"n", 14, 140d}), getRecord(new 
Object[]{"n", 14, 140d, 10d, 1400d}));
@@ -121,15 +120,15 @@ public class IndexedTableTest {
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.DOUBLE, ColumnDataType.INT, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE});
 
     // Test SimpleIndexedTable
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
queryContext, 5);
-    IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 
10);
+    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
queryContext, 5, TRIM_THRESHOLD);
+    IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 
10, TRIM_THRESHOLD);
     testNonConcurrent(indexedTable, mergeTable);
     indexedTable.finish(true);
     checkSurvivors(indexedTable, survivors);
 
     // Test ConcurrentIndexedTable
-    indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5);
-    mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10);
+    indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5, 
TRIM_THRESHOLD);
+    mergeTable = new SimpleIndexedTable(dataSchema, queryContext, 10, 
TRIM_THRESHOLD);
     testNonConcurrent(indexedTable, mergeTable);
     indexedTable.finish(true);
     checkSurvivors(indexedTable, survivors);
@@ -243,10 +242,10 @@ public class IndexedTableTest {
     DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", 
"sum(m1)", "max(m2)"},
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
 
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
queryContext, 5);
+    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
queryContext, 5, TRIM_THRESHOLD);
     testNoMoreNewRecordsInTable(indexedTable);
 
-    indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5);
+    indexedTable = new ConcurrentIndexedTable(dataSchema, queryContext, 5, 
TRIM_THRESHOLD);
     testNoMoreNewRecordsInTable(indexedTable);
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
index f0358a7..f783f41 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
@@ -175,7 +175,7 @@ public class TableResizerTest {
     tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1"));
     recordsMap = new HashMap<>(_recordsMap);
-    tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
     assertEquals(recordsMap.size(), trimToSize);
     assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b
     assertTrue(recordsMap.containsKey(_keys.get(1)));
@@ -184,7 +184,7 @@ public class TableResizerTest {
     tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)"));
     recordsMap = new HashMap<>(_recordsMap);
-    tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
     assertEquals(recordsMap.size(), trimToSize);
     assertTrue(recordsMap.containsKey(_keys.get(4))); // 2, 3
     assertTrue(recordsMap.containsKey(_keys.get(3)));
@@ -193,7 +193,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + 
"DISTINCTCOUNT(m3) DESC, d1"));
     recordsMap = new HashMap<>(_recordsMap);
-    tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
     assertEquals(recordsMap.size(), trimToSize);
     assertTrue(recordsMap.containsKey(_keys.get(4))); // 4, 3
     assertTrue(recordsMap.containsKey(_keys.get(3)));
@@ -202,7 +202,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 / 
(DISTINCTCOUNT(m3) + 1), d1 DESC"));
     recordsMap = new HashMap<>(_recordsMap);
-    tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
     assertEquals(recordsMap.size(), trimToSize);
     assertTrue(recordsMap.containsKey(_keys.get(1))); // 3.33, 12.5
     assertTrue(recordsMap.containsKey(_keys.get(0)));
@@ -217,21 +217,21 @@ public class TableResizerTest {
     TableResizer tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1"));
     Map<Key, Record> recordsMap = new HashMap<>(_recordsMap);
-    List<Record> sortedRecords = 
tableResizer.resizeAndSortRecordsMap(recordsMap, TRIM_TO_SIZE);
+    List<Record> sortedRecords = tableResizer.sortRecordsMap(recordsMap, 
TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a, b
     assertEquals(sortedRecords.get(1), _records.get(1));
 
     // d1 asc - trim to 1
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+    sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a
 
     // d1 asc, d3 desc (tie breaking with 2nd comparator)
     tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, d3 
DESC"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 
TRIM_TO_SIZE);
+    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a, b, c (300)
     assertEquals(sortedRecords.get(1), _records.get(1));
@@ -239,7 +239,7 @@ public class TableResizerTest {
 
     // d1 asc, d3 desc (tie breaking with 2nd comparator) - trim to 1
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+    sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
     assertEquals(sortedRecords.size(), 1);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a
 
@@ -247,7 +247,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, 
SUM(m1) DESC, max(m2) DESC"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 
TRIM_TO_SIZE);
+    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a, b, c (30, 300)
     assertEquals(sortedRecords.get(1), _records.get(1));
@@ -255,7 +255,7 @@ public class TableResizerTest {
 
     // d1 asc, sum(m1) desc, max(m2) desc - trim to 1
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+    sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
     assertEquals(sortedRecords.size(), 1);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a
 
@@ -263,7 +263,7 @@ public class TableResizerTest {
     tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 
TRIM_TO_SIZE);
+    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(4));  // 2, 3, 3.33
     assertEquals(sortedRecords.get(1), _records.get(3));
@@ -273,7 +273,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + 
"DISTINCTCOUNT(m3) DESC, d1"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 
TRIM_TO_SIZE);
+    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(4));  // 4, 3, 2 (b)
     assertEquals(sortedRecords.get(1), _records.get(3));
@@ -283,7 +283,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 / 
(DISTINCTCOUNT(m3) + 1), d1 DESC"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.resizeAndSortRecordsMap(recordsMap, 
TRIM_TO_SIZE);
+    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(1));  // 3.33, 12.5, 5
     assertEquals(sortedRecords.get(1), _records.get(0));
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
index bfbb6c4..6dcf556 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java
@@ -94,7 +94,7 @@ public class CombineSlowOperatorsTest {
     List<Operator> operators = getOperators();
     GroupByOrderByCombineOperator combineOperator = new 
GroupByOrderByCombineOperator(operators,
         QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) 
FROM table GROUP BY column"),
-        _executorService, TIMEOUT_MS);
+        _executorService, TIMEOUT_MS, 
InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
     testCombineOperator(operators, combineOperator);
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index e45b441..37ccdd4 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -230,7 +230,8 @@ public class SelectionCombineOperatorTest {
     }
     CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, 
queryContext, EXECUTOR,
         System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
-        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
+        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null,
+        InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
     return combinePlanNode.run().nextBlock();
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
index 11053c7..2cf4758 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
@@ -58,7 +58,8 @@ public class CombinePlanNodeTest {
       }
       CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, 
_queryContext, _executorService,
           System.currentTimeMillis() + 
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
-          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
+          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null,
+          InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
       combinePlanNode.run();
       Assert.assertEquals(numPlans, count.get());
     }
@@ -83,7 +84,8 @@ public class CombinePlanNodeTest {
     }
     CombinePlanNode combinePlanNode =
         new CombinePlanNode(planNodes, _queryContext, _executorService, 
System.currentTimeMillis() + 100,
-            InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
+            InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null,
+            InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
     try {
       combinePlanNode.run();
     } catch (RuntimeException e) {
@@ -105,7 +107,8 @@ public class CombinePlanNodeTest {
     }
     CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, 
_queryContext, _executorService,
         System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
-        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
+        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null,
+        InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
     try {
       combinePlanNode.run();
     } catch (RuntimeException e) {
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
index 5c5f2b3..7b57721 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
@@ -40,6 +40,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
@@ -130,10 +131,11 @@ public class BenchmarkCombineGroupBy {
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
   public void concurrentIndexedTableForCombineGroupBy()
       throws InterruptedException, ExecutionException, TimeoutException {
-    int capacity = GroupByUtils.getTableCapacity(_queryContext);
+    int trimSize = GroupByUtils.getTableCapacity(_queryContext);
 
     // make 1 concurrent table
-    IndexedTable concurrentIndexedTable = new 
ConcurrentIndexedTable(_dataSchema, _queryContext, capacity);
+    IndexedTable concurrentIndexedTable = new 
ConcurrentIndexedTable(_dataSchema, _queryContext, trimSize,
+        InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD);
 
     List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
 
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
index 4100420..c466ad6 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
@@ -56,7 +56,8 @@ import org.openjdk.jmh.runner.options.TimeValue;
 
 @State(Scope.Benchmark)
 public class BenchmarkIndexedTable {
-  private static final int CAPACITY = 800;
+  private static final int TRIM_SIZE = 800;
+  private static final int TRIM_THRESHOLD = TRIM_SIZE * 4;
   private static final int NUM_RECORDS = 1000;
   private static final Random RANDOM = new Random();
 
@@ -113,7 +114,7 @@ public class BenchmarkIndexedTable {
     int numSegments = 10;
 
     // make 1 concurrent table
-    IndexedTable concurrentIndexedTable = new 
ConcurrentIndexedTable(_dataSchema, _queryContext, CAPACITY);
+    IndexedTable concurrentIndexedTable = new 
ConcurrentIndexedTable(_dataSchema, _queryContext, TRIM_SIZE, TRIM_THRESHOLD);
 
     // 10 parallel threads putting 10k records into the table
 
@@ -161,7 +162,7 @@ public class BenchmarkIndexedTable {
     for (int i = 0; i < numSegments; i++) {
 
       // make 10 indexed tables
-      IndexedTable simpleIndexedTable = new SimpleIndexedTable(_dataSchema, 
_queryContext, CAPACITY);
+      IndexedTable simpleIndexedTable = new SimpleIndexedTable(_dataSchema, 
_queryContext, TRIM_SIZE, TRIM_THRESHOLD);
       simpleIndexedTables.add(simpleIndexedTable);
 
       // put 10k records in each indexed table, in parallel


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to