Jackie-Jiang commented on a change in pull request #5451:
URL: https://github.com/apache/incubator-pinot/pull/5451#discussion_r432775396



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/customobject/DistinctTable.java
##########
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.customobject;
+
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * The {@code DistinctTable} class serves as the intermediate result of {@code 
DistinctAggregationFunction}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctTable {
+  private static final int MAX_INITIAL_CAPACITY = 10000;
+
+  private final DataSchema _dataSchema;
+  private final int _limit;
+  private final Set<Record> _uniqueRecords;
+  private final PriorityQueue<Record> _sortedRecords;
+  private final List<Record> _records;
+
+  /**
+   * Constructor of the main {@code DistinctTable} which can be used to add 
records and merge other
+   * {@code DistinctTable}s.
+   */
+  public DistinctTable(DataSchema dataSchema, @Nullable List<SelectionSort> 
orderBy, int limit) {
+    _dataSchema = dataSchema;
+    _limit = limit;
+
+    // TODO: see if 10k is the right max initial capacity to use
+    // NOTE: When LIMIT is smaller than or equal to the MAX_INITIAL_CAPACITY, 
no resize is required.
+    int initialCapacity = Math.min(limit, MAX_INITIAL_CAPACITY);
+    _uniqueRecords = new ObjectOpenHashSet<>(initialCapacity);
+    if (orderBy != null) {
+      String[] columns = dataSchema.getColumnNames();
+      int numColumns = columns.length;
+      Object2IntOpenHashMap<String> columnIndexMap = new 
Object2IntOpenHashMap<>(numColumns);
+      for (int i = 0; i < numColumns; i++) {
+        columnIndexMap.put(columns[i], i);
+      }
+      int numOrderByColumns = orderBy.size();
+      int[] orderByColumnIndexes = new int[numOrderByColumns];
+      boolean[] orderByAsc = new boolean[numOrderByColumns];
+      for (int i = 0; i < numOrderByColumns; i++) {
+        SelectionSort selectionSort = orderBy.get(i);
+        orderByColumnIndexes[i] = 
columnIndexMap.getInt(selectionSort.getColumn());
+        orderByAsc[i] = selectionSort.isIsAsc();
+      }
+      _sortedRecords = new PriorityQueue<>(initialCapacity, (record1, record2) 
-> {
+        Object[] values1 = record1.getValues();
+        Object[] values2 = record2.getValues();
+        for (int i = 0; i < numOrderByColumns; i++) {
+          Comparable valueToCompare1 = (Comparable) 
values1[orderByColumnIndexes[i]];
+          Comparable valueToCompare2 = (Comparable) 
values2[orderByColumnIndexes[i]];
+          int result =
+              orderByAsc[i] ? valueToCompare2.compareTo(valueToCompare1) : 
valueToCompare1.compareTo(valueToCompare2);
+          if (result != 0) {
+            return result;
+          }
+        }
+        return 0;
+      });
+    } else {
+      _sortedRecords = null;
+    }
+    _records = null;
+  }
+
+  /**
+   * Returns the {@code DataSchema} of the {@code DistinctTable}.
+   */
+  public DataSchema getDataSchema() {
+    return _dataSchema;
+  }
+
+  /**
+   * Returns the number of unique records within the {@code DistinctTable}.
+   */
+  public int size() {
+    if (_uniqueRecords != null) {
+      // Server-side
+      return _uniqueRecords.size();
+    } else {
+      // Broker-side
+      return _records.size();
+    }
+  }
+
+  /**
+   * Adds a record into the DistinctTable and returns whether more records 
should be added. No more records should be
+   * added iff there is no order-by column and enough unique records have been 
collected.
+   */
+  public boolean add(Record record) {
+    if (_uniqueRecords.contains(record)) {
+      return true;
+    }
+    if (_sortedRecords != null) {
+      if (_sortedRecords.size() < _limit) {
+        _uniqueRecords.add(record);
+        _sortedRecords.offer(record);
+      } else {
+        Record leastRecord = _sortedRecords.peek();
+        if (_sortedRecords.comparator().compare(record, leastRecord) > 0) {
+          _uniqueRecords.remove(leastRecord);
+          _uniqueRecords.add(record);
+          _sortedRecords.poll();
+          _sortedRecords.offer(record);
+        }
+      }
+      return true;
+    } else {
+      _uniqueRecords.add(record);
+      return _uniqueRecords.size() < _limit;
+    }
+  }
+
+  /**
+   * Returns {@code true} if no more records should be added, {@code false 
otherwise}. No more records should be added
+   * iff there is no order-by columns and enough unique records have been 
collected.
+   */
+  public boolean shouldNotAddMore() {
+    return _sortedRecords == null && _uniqueRecords.size() == _limit;
+  }
+
+  /*
+   * SERVER ONLY METHODS
+   */
+
+  /**
+   * (Server-side) Merges another {@code DistinctTable} into the current one.
+   * <p>NOTE: {@code DistinctTable} on Server-side has non-null {@code 
_uniqueRecords}.
+   */
+  public void serverSideMerge(DistinctTable distinctTable) {
+    if (shouldNotAddMore()) {
+      return;
+    }
+    for (Record record : distinctTable._uniqueRecords) {
+      if (!add(record)) {
+        return;
+      }
+    }
+  }
+
+  public byte[] toBytes()
+      throws IOException {
+    // NOTE: Serialize the DistinctTable as a DataTable
+    DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
+    DataSchema.ColumnDataType[] columnDataTypes = 
_dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    for (Record record : _uniqueRecords) {
+      dataTableBuilder.startRow();
+      Object[] values = record.getValues();
+      for (int i = 0; i < numColumns; i++) {
+        switch (columnDataTypes[i]) {
+          case INT:
+            dataTableBuilder.setColumn(i, (int) values[i]);
+            break;
+          case LONG:
+            dataTableBuilder.setColumn(i, (long) values[i]);
+            break;
+          case FLOAT:
+            dataTableBuilder.setColumn(i, (float) values[i]);
+            break;
+          case DOUBLE:
+            dataTableBuilder.setColumn(i, (double) values[i]);
+            break;
+          case STRING:
+            dataTableBuilder.setColumn(i, (String) values[i]);
+            break;
+          case BYTES:
+            dataTableBuilder.setColumn(i, (ByteArray) values[i]);
+            break;
+          // Add other distinct column type supports here
+          default:
+            throw new IllegalStateException();
+        }
+      }
+      dataTableBuilder.finishRow();
+    }
+    return dataTableBuilder.build().toBytes();
+  }
+
+  /*
+   * BROKER ONLY METHODS
+   */
+
+  /**
+   * Broker-side constructor to deserialize the {@code DistinctTable} from a 
{@code ByteBuffer}. The
+   * {@code DistinctTable} constructed this way cannot be used to add records 
or merge other {@code DistinctTable}s
+   * but can only be merged into the main {@code DistinctTable} because the 
order-by and limit information is missing.

Review comment:
       There are 2 types of DistinctTables: Main DistinctTable and Deserialized 
DistinctTable. Added javadoc to explain them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to