siddharthteotia commented on a change in pull request #4790: Support ORDER BY
for DISTINCT queries
URL: https://github.com/apache/incubator-pinot/pull/4790#discussion_r348749858
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
##########
@@ -39,85 +44,54 @@
* (2) The same object is serialized by the server inside the data table
* for sending the results to broker. Broker deserializes it.
*/
-public class DistinctTable {
+public class DistinctTable extends BaseTable {
private static final double LOAD_FACTOR = 0.75;
private static final int MAX_INITIAL_CAPACITY = 64 * 1024;
- private FieldSpec.DataType[] _columnTypes;
- private String[] _columnNames;
- private Set<Key> _table;
-
- /**
- * Add a row to hash table
- * @param key multi-column key to add
- */
- public void addKey(final Key key) {
- _table.add(key);
- }
+ private Set<Record> _uniqueRecordsSet;
+ private boolean _noMoreNewRecords;
+ private Iterator<Record> _sortedIterator;
- public DistinctTable(int limit) {
+ public DistinctTable(DataSchema dataSchema, List<SelectionSort> orderBy, int
limit) {
// TODO: see if 64k is the right max initial capacity to use
// if it turns out that users always use LIMIT N > 0.75 * 64k and
// there are indeed that many records, then there will be resizes.
// The current method of setting the initial capacity as
// min(64k, limit/loadFactor) will not require resizes for LIMIT N
// where N <= 48000
+ super(dataSchema, Collections.emptyList(), orderBy, limit);
int initialCapacity = Math.min(MAX_INITIAL_CAPACITY,
Math.abs(nextPowerOfTwo((int) (limit / LOAD_FACTOR))));
- _table = new HashSet<>(initialCapacity);
+ _uniqueRecordsSet = new HashSet<>(initialCapacity);
+ _noMoreNewRecords = false;
}
- /**
- * DESERIALIZE: Broker side
- * @param byteBuffer data to deserialize
- * @throws IOException
- */
- public DistinctTable(ByteBuffer byteBuffer)
- throws IOException {
- DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
- DataSchema dataSchema = dataTable.getDataSchema();
- int numRows = dataTable.getNumberOfRows();
- int numColumns = dataSchema.size();
+ @Override
+ public boolean upsert(Key key, Record record) {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
- _table = new HashSet<>();
+ @Override
+ public boolean upsert(Record newRecord) {
+ if (_noMoreNewRecords) {
+ // for no ORDER BY queries, if we have reached the N as specified
+ // in LIMIT N (or default 10 if user didn't specify anything)
+ // then this function is NOOP
+ return false;
+ }
- // extract rows from the datatable
- for (int rowIndex = 0; rowIndex < numRows; rowIndex++) {
- Object[] columnValues = new Object[numColumns];
- for (int colIndex = 0; colIndex < numColumns; colIndex++) {
- DataSchema.ColumnDataType columnDataType =
dataSchema.getColumnDataType(colIndex);
- switch (columnDataType) {
- case INT:
- columnValues[colIndex] = dataTable.getInt(rowIndex, colIndex);
- break;
- case LONG:
- columnValues[colIndex] = dataTable.getLong(rowIndex, colIndex);
- break;
- case FLOAT:
- columnValues[colIndex] = dataTable.getFloat(rowIndex, colIndex);
- break;
- case DOUBLE:
- columnValues[colIndex] = dataTable.getDouble(rowIndex, colIndex);
- break;
- case STRING:
- columnValues[colIndex] = dataTable.getString(rowIndex, colIndex);
- break;
- case BYTES:
- columnValues[colIndex] = dataTable.getString(rowIndex, colIndex);
- default:
- throw new IllegalStateException(
- "Unexpected column data type " + columnDataType + " while
deserializing data table for DISTINCT query");
- }
- }
+ _uniqueRecordsSet.add(newRecord);
- _table.add(new Key(columnValues));
+ if (_uniqueRecordsSet.size() >= _maxCapacity) {
+ if (_isOrderBy) {
+ // ORDER BY; capacity < maxCapacity so trim to capacity
Review comment:
If there is no ORDER BY, capacity is equal to maxCapacity
If there is ORDER BY, capacity is less than maxCapacity (this is existing in
BaseTable implementation, not changed as part of this PR)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]