mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325800046
 
 

 ##########
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
 ##########
 @@ -19,37 +19,65 @@
 package org.apache.pinot.core.data.table;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nonnull;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.order.OrderByUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Thread safe {@link Table} implementation for aggregating TableRecords based 
on combination of keys
  */
 public class ConcurrentIndexedTable extends IndexedTable {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConcurrentIndexedTable.class);
+
   private ConcurrentMap<Key, Record> _lookupMap;
-  private Comparator<Record> _minHeapComparator;
   private ReentrantReadWriteLock _readWriteLock;
 
+  private boolean _isOrderBy;
+  private Comparator<Record> _minHeapComparator;
+  private Comparator<Record> _orderByComparator;
+
+  private AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
+  private LongAdder _numResizes = new LongAdder();
+  private LongAccumulator _resizeTime = new LongAccumulator(Long::sum, 0);
+
+  /**
+   * Initializes the data structures and comparators needed for this Table
+   * @param dataSchema data schema of the record's keys and values
+   * @param aggregationInfos aggregation infors for the aggregations in 
record'd values
+   * @param orderBy list of {@link SelectionSort} defining the order by
+   * @param maxCapacity the max number of records to hold
+   * @param sort does final result need to be sorted
+   */
   @Override
   public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> 
aggregationInfos, List<SelectionSort> orderBy,
-      int maxCapacity) {
-    super.init(dataSchema, aggregationInfos, orderBy, maxCapacity);
+      int maxCapacity, boolean sort) {
+    super.init(dataSchema, aggregationInfos, orderBy, maxCapacity, sort);
 
-    _minHeapComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, 
orderBy, aggregationInfos).reversed();
     _lookupMap = new ConcurrentHashMap<>();
     _readWriteLock = new ReentrantReadWriteLock();
+    _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
+    if (_isOrderBy) {
+      _minHeapComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, 
orderBy, aggregationInfos).reversed();
 
 Review comment:
   Will this support order-by with ASC/DESC?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to