npawar commented on a change in pull request #4798: Decouple Key from Record
URL: https://github.com/apache/incubator-pinot/pull/4798#discussion_r343818759
 
 

 ##########
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
 ##########
 @@ -18,75 +18,55 @@
  */
 package org.apache.pinot.core.data.table;
 
-import java.util.Iterator;
+import java.util.Arrays;
 import java.util.List;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 
 
 /**
- * Base abstract implementation of Table for indexed lookup
+ * Base implementation of Map-based Table for indexed lookup
  */
-public abstract class IndexedTable implements Table {
+public abstract class IndexedTable extends BaseTable {
 
-  AggregationFunction[] _aggregationFunctions;
-  int _numAggregations;
-  private DataSchema _dataSchema;
-
-  // the capacity we need to trim to
-  int _capacity;
-  // the capacity with added buffer, in order to collect more records than 
capacity for better precision
-  int _maxCapacity;
-
-  boolean _isOrderBy;
-  IndexedTableResizer _indexedTableResizer;
+  private final KeyExtractor _keyExtractor;
+  final int _numKeyColumns;
 
   /**
    * Initializes the variables and comparators needed for the table
    */
-  public IndexedTable(DataSchema dataSchema, List<AggregationInfo> 
aggregationInfos, List<SelectionSort> orderBy,
-      int capacity) {
-    _dataSchema = dataSchema;
-
-    _numAggregations = aggregationInfos.size();
-    _aggregationFunctions = new AggregationFunction[_numAggregations];
-    for (int i = 0; i < _numAggregations; i++) {
-      _aggregationFunctions[i] =
-          
AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfos.get(i)).getAggregationFunction();
-    }
+  IndexedTable(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, 
List<SelectionSort> orderBy, int capacity) {
+    super(dataSchema, aggregationInfos, orderBy, capacity);
 
-    _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
-    if (_isOrderBy) {
-      _indexedTableResizer = new IndexedTableResizer(dataSchema, 
aggregationInfos, orderBy);
-
-      // TODO: tune these numbers
-      // Based on the capacity and maxCapacity, the resizer will smartly 
choose to evict/retain recors from the PQ
-      if (capacity <= 100_000) { // Capacity is small, make a very large 
buffer. Make PQ of records to retain, during resize
-        _maxCapacity = 1_000_000;
-      } else { // Capacity is large, make buffer only slightly bigger. Make PQ 
of records to evict, during resize
-        _maxCapacity = (int) (capacity * 1.2);
-      }
-    } else {
-      _maxCapacity = capacity;
-    }
-    _capacity = capacity;
+    _numKeyColumns = dataSchema.size() - _numAggregations;
+    _keyExtractor = new KeyExtractor(_numKeyColumns);
   }
 
   @Override
-  public boolean merge(Table table) {
-    Iterator<Record> iterator = table.iterator();
-    while (iterator.hasNext()) {
-      upsert(iterator.next());
-    }
-    return true;
+  public boolean upsert(Record newRecord) {
+    Key key = _keyExtractor.extractKey(newRecord);
+    return upsert(key, newRecord);
   }
 
-  @Override
-  public DataSchema getDataSchema() {
-    return _dataSchema;
+  /**
+   * Extractor for key component of a Record
+   * The assumption is that the keys will always be before the aggregations in 
the Record. It is the caller's responsibility to ensure that.
+   * This will help us avoid index lookups, while extracting keys, and also 
while aggregating the values
+   */
+  private static class KeyExtractor {
+    private int _keyIndexes;
+
+    KeyExtractor(int keyIndexes) {
+      _keyIndexes = keyIndexes;
+    }
+
+    /**
+     * Returns the Key from the Record
+     */
+    Key extractKey(Record record) {
+      Object[] keys = Arrays.copyOf(record.getValues(), _keyIndexes);
 
 Review comment:
   yes we could do that. But we don't want Key to be only constructed from 
Record right. FOr example, in CombineGroupByOperator, we already have the 
Object[] keys array, and we can just directly create Key(Object[] keys)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to