mayankshriv commented on a change in pull request #6044:
URL: https://github.com/apache/incubator-pinot/pull/6044#discussion_r503640236



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
##########
@@ -231,58 +239,130 @@ private DataSchema 
getPrePostAggregationDataSchema(DataSchema dataSchema) {
     return new DataSchema(columnNames, columnDataTypes);
   }
 
-  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTables) {
+  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTablesToReduce,
+      DataTableReducerContext reducerContext) {
+    long start = System.currentTimeMillis();
+    int numDataTables = dataTablesToReduce.size();
+
+    // Get the number of threads to use for reducing.
+    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
+
+    // In case of single reduce thread, fall back to SimpleIndexedTable to 
avoid redundant locking/unlocking calls.
     int capacity = GroupByUtils.getTableCapacity(_queryContext);
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
_queryContext, capacity);
+    IndexedTable indexedTable =
+        (numReduceThreadsToUse > 1) ? new ConcurrentIndexedTable(dataSchema, 
_queryContext, capacity)
+            : new SimpleIndexedTable(dataSchema, _queryContext, capacity);
+
+    Future[] futures = new Future[numDataTables];
+    CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
+
+    // Create groups of data tables that each thread can process concurrently.
+    // Given that numReduceThreads is <= numDataTables, each group will have 
at least one data table.
+    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+    List<List<DataTable>> reduceGroups = new 
ArrayList<>(numReduceThreadsToUse);
+
+    for (int i = 0; i < numReduceThreadsToUse; i++) {
+      reduceGroups.add(new ArrayList<>());
+    }
+    for (int i = 0; i < numDataTables; i++) {
+      reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
+    }
+
+    int cnt = 0;
     ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-    for (DataTable dataTable : dataTables) {
-      int numRows = dataTable.getNumberOfRows();
-      for (int rowId = 0; rowId < numRows; rowId++) {
-        Object[] values = new Object[_numColumns];
-        for (int colId = 0; colId < _numColumns; colId++) {
-          switch (columnDataTypes[colId]) {
-            case INT:
-              values[colId] = dataTable.getInt(rowId, colId);
-              break;
-            case LONG:
-              values[colId] = dataTable.getLong(rowId, colId);
-              break;
-            case FLOAT:
-              values[colId] = dataTable.getFloat(rowId, colId);
-              break;
-            case DOUBLE:
-              values[colId] = dataTable.getDouble(rowId, colId);
-              break;
-            case STRING:
-              values[colId] = dataTable.getString(rowId, colId);
-              break;
-            case BYTES:
-              values[colId] = dataTable.getBytes(rowId, colId);
-              break;
-            case OBJECT:
-              values[colId] = dataTable.getObject(rowId, colId);
-              break;
-            // Add other aggregation intermediate result / group-by column 
type supports here
-            default:
-              throw new IllegalStateException();
+    for (List<DataTable> reduceGroup : reduceGroups) {
+      futures[cnt++] = reducerContext.getExecutorService().submit(new 
TraceRunnable() {
+        @Override
+        public void runJob() {
+          for (DataTable dataTable : reduceGroup) {
+            int numRows = dataTable.getNumberOfRows();
+
+            try {
+              for (int rowId = 0; rowId < numRows; rowId++) {
+                Object[] values = new Object[_numColumns];
+                for (int colId = 0; colId < _numColumns; colId++) {
+                  switch (columnDataTypes[colId]) {
+                    case INT:
+                      values[colId] = dataTable.getInt(rowId, colId);
+                      break;
+                    case LONG:
+                      values[colId] = dataTable.getLong(rowId, colId);
+                      break;
+                    case FLOAT:
+                      values[colId] = dataTable.getFloat(rowId, colId);
+                      break;
+                    case DOUBLE:
+                      values[colId] = dataTable.getDouble(rowId, colId);
+                      break;
+                    case STRING:
+                      values[colId] = dataTable.getString(rowId, colId);
+                      break;
+                    case BYTES:
+                      values[colId] = dataTable.getBytes(rowId, colId);
+                      break;
+                    case OBJECT:
+                      values[colId] = dataTable.getObject(rowId, colId);
+                      break;
+                    // Add other aggregation intermediate result / group-by 
column type supports here
+                    default:
+                      throw new IllegalStateException();
+                  }
+                }
+                indexedTable.upsert(new Record(values));
+              }
+            } finally {
+              countDownLatch.countDown();
+            }
           }
         }
-        indexedTable.upsert(new Record(values));
+      });
+    }
+
+    try {
+      long timeOutMs = reducerContext.getReduceTimeOutMs() - 
(System.currentTimeMillis() - start);
+      countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
       }
     }
+
     indexedTable.finish(true);
     return indexedTable;
   }
 
+  /**
+   * Computes the number of reduce threads to use per query.
+   * <ul>
+   *   <li> Use single thread if number of data tables to reduce is less than 
{@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
+   *   <li> Else, use min of max allowed reduce threads per query, and number 
of data tables.</li>
+   * </ul>
+   *
+   * @param numDataTables Number of data tables to reduce
+   * @param maxReduceThreadsPerQuery Max allowed reduce threads per query
+   * @return Number of reduce threads to use for the query
+   */
+  private int getNumReduceThreadsToUse(int numDataTables, int 
maxReduceThreadsPerQuery) {
+    // Use single thread if number of data tables < 
MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
+    if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
+      return Math.min(1, numDataTables); // Number of data tables can be zero.

Review comment:
       Unit test fails, seems numDataTables can be zero.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to