npawar commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r332239992
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java ########## @@ -19,37 +19,68 @@ package org.apache.pinot.core.data.table; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nonnull; +import org.apache.commons.collections.CollectionUtils; import org.apache.pinot.common.request.AggregationInfo; import org.apache.pinot.common.request.SelectionSort; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.data.order.OrderByUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Thread safe {@link Table} implementation for aggregating TableRecords based on combination of keys */ public class ConcurrentIndexedTable extends IndexedTable { + private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentIndexedTable.class); + private ConcurrentMap<Key, Record> _lookupMap; - private Comparator<Record> _minHeapComparator; private ReentrantReadWriteLock _readWriteLock; + private boolean _isOrderBy; + private Comparator<Record> _orderByComparator; + private Comparator<Record> _finalOrderByComparator; + private List<Integer> _aggregationIndexes; + + private AtomicBoolean _noMoreNewRecords = new AtomicBoolean(); + private LongAdder _numResizes = new LongAdder(); + private LongAccumulator _resizeTime = new LongAccumulator(Long::sum, 0); + + /** + * Initializes the data structures and comparators needed for this Table + * @param dataSchema data schema of the record's keys and values + * @param aggregationInfos aggregation infors for the aggregations in record'd values + * @param orderBy list of {@link SelectionSort} defining the order by + * @param maxCapacity the max number of records to hold + * @param sort does final result need to be sorted + */ @Override public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy, - int maxCapacity) { - super.init(dataSchema, aggregationInfos, orderBy, maxCapacity); + int maxCapacity, boolean sort) { + super.init(dataSchema, aggregationInfos, orderBy, maxCapacity, sort); - _minHeapComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos).reversed(); _lookupMap = new ConcurrentHashMap<>(); _readWriteLock = new ReentrantReadWriteLock(); + _isOrderBy = CollectionUtils.isNotEmpty(orderBy); + if (_isOrderBy) { + _orderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, false); Review comment: I agree that there is still another step for optimization. I've listed it down as next steps. I already optimized it by avoiding extractFinalResults in the comparison, and extracting before hand as per your suggestion. I'd like to do this additional optimization in a later PR to prevent the scope of this one from exploding too much. Also the aggregation functions which have expensive `extractFinalResults` are anyway not intermediate result comparable. The redundancy is just for count, sum, min, max, avg, minmax, but those are pretty light weight. Will certainly tackle this in the immediate next PR ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org