This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4967780802 Configure final reduce phase threads for heavy aggreagtion 
functions (#14662)
4967780802 is described below

commit 496778080232f7bff961dbaa56dca4c2b6f1f9b5
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Jan 24 05:34:04 2025 +0800

    Configure final reduce phase threads for heavy aggreagtion functions 
(#14662)
    
    * Configure final reduce phase threads for heavy aggreagtion functions
    
    * Address comments
    
    * Add tests with numThreadsForFinalReduce
---
 .../common/utils/config/QueryOptionsUtils.java     |  13 +++
 .../core/data/table/ConcurrentIndexedTable.java    |   5 +-
 .../apache/pinot/core/data/table/IndexedTable.java | 101 +++++++++++++++++++--
 .../pinot/core/data/table/SimpleIndexedTable.java  |   6 +-
 .../table/UnboundedConcurrentIndexedTable.java     |   6 +-
 .../operator/combine/GroupByCombineOperator.java   |   3 +-
 .../core/plan/maker/InstancePlanMakerImplV2.java   |  17 ++++
 .../core/query/reduce/GroupByDataTableReducer.java |   2 +-
 .../core/query/request/context/QueryContext.java   |  21 +++++
 .../org/apache/pinot/core/util/GroupByUtils.java   |  35 ++++---
 .../accounting/ResourceManagerAccountingTest.java  |   3 +-
 .../pinot/core/data/table/IndexedTableTest.java    |  19 ++--
 .../pinot/integration/tests/custom/ArrayTest.java  |   4 +-
 .../integration/tests/custom/BytesTypeTest.java    |   6 +-
 .../integration/tests/custom/CpcSketchTest.java    |   5 +-
 .../CustomDataQueryClusterIntegrationTest.java     |  16 ++--
 .../tests/custom/FloatingPointDataTypeTest.java    |   4 +-
 .../integration/tests/custom/GeoSpatialTest.java   |   5 +-
 .../integration/tests/custom/JsonPathTest.java     |   4 +-
 .../tests/custom/MapFieldTypeRealtimeTest.java     |   4 +-
 .../integration/tests/custom/MapFieldTypeTest.java |   4 +-
 .../integration/tests/custom/MapTypeTest.java      |   4 +-
 .../integration/tests/custom/SumPrecisionTest.java |   5 +-
 .../integration/tests/custom/TextIndicesTest.java  |   4 +-
 .../integration/tests/custom/ThetaSketchTest.java  |   4 +-
 .../integration/tests/custom/TimestampTest.java    |   5 +-
 .../integration/tests/custom/TupleSketchTest.java  |   5 +-
 .../pinot/integration/tests/custom/ULLTest.java    |   5 +-
 .../pinot/integration/tests/custom/VectorTest.java |   4 +-
 .../integration/tests/custom/WindowFunnelTest.java |  65 +++++++++++--
 .../apache/pinot/perf/BenchmarkCombineGroupBy.java |   2 +-
 .../apache/pinot/perf/BenchmarkIndexedTable.java   |   4 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  11 ++-
 33 files changed, 314 insertions(+), 87 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5f88a9691c..5e8ba86643 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -248,6 +248,19 @@ public class QueryOptionsUtils {
     return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD, 
groupByTrimThreshold);
   }
 
+  @Nullable
+  public static Integer getNumThreadsExtractFinalResult(Map<String, String> 
queryOptions) {
+    String numThreadsExtractFinalResultString = 
queryOptions.get(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT);
+    return checkedParseInt(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT, 
numThreadsExtractFinalResultString, 1);
+  }
+
+  @Nullable
+  public static Integer getChunkSizeExtractFinalResult(Map<String, String> 
queryOptions) {
+    String chunkSizeExtractFinalResultString =
+        queryOptions.get(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT);
+    return checkedParseInt(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT, 
chunkSizeExtractFinalResultString, 1);
+  }
+
   public static boolean isNullHandlingEnabled(Map<String, String> 
queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_NULL_HANDLING));
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index 871eea7c26..fd75284324 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.data.table;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.pinot.common.utils.DataSchema;
@@ -33,9 +34,9 @@ public class ConcurrentIndexedTable extends IndexedTable {
   private final ReentrantReadWriteLock _readWriteLock = new 
ReentrantReadWriteLock();
 
   public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, 
QueryContext queryContext, int resultSize,
-      int trimSize, int trimThreshold, int initialCapacity) {
+      int trimSize, int trimThreshold, int initialCapacity, ExecutorService 
executorService) {
     super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, 
trimThreshold,
-        new ConcurrentHashMap<>(initialCapacity));
+        new ConcurrentHashMap<>(initialCapacity), executorService);
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index bce224eb3a..d96854ccb5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -19,17 +19,24 @@
 package org.apache.pinot.core.data.table;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
 
 
 /**
@@ -37,6 +44,7 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public abstract class IndexedTable extends BaseTable {
+  private final ExecutorService _executorService;
   protected final Map<Key, Record> _lookupMap;
   protected final boolean _hasFinalInput;
   protected final int _resultSize;
@@ -46,6 +54,8 @@ public abstract class IndexedTable extends BaseTable {
   protected final TableResizer _tableResizer;
   protected final int _trimSize;
   protected final int _trimThreshold;
+  protected final int _numThreadsExtractFinalResult;
+  protected final int _chunkSizeExtractFinalResult;
 
   protected Collection<Record> _topRecords;
   private int _numResizes;
@@ -63,13 +73,14 @@ public abstract class IndexedTable extends BaseTable {
    * @param lookupMap     Map from keys to records
    */
   protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, 
QueryContext queryContext, int resultSize,
-      int trimSize, int trimThreshold, Map<Key, Record> lookupMap) {
+      int trimSize, int trimThreshold, Map<Key, Record> lookupMap, 
ExecutorService executorService) {
     super(dataSchema);
 
     Preconditions.checkArgument(resultSize >= 0, "Result size can't be 
negative");
     Preconditions.checkArgument(trimSize >= 0, "Trim size can't be negative");
     Preconditions.checkArgument(trimThreshold >= 0, "Trim threshold can't be 
negative");
 
+    _executorService = executorService;
     _lookupMap = lookupMap;
     _hasFinalInput = hasFinalInput;
     _resultSize = resultSize;
@@ -84,6 +95,10 @@ public abstract class IndexedTable extends BaseTable {
     assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == 
Integer.MAX_VALUE);
     _trimSize = trimSize;
     _trimThreshold = trimThreshold;
+    // NOTE: The upper limit of threads number for final reduce is set to 2 * 
number of available processors by default
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
   }
 
   @Override
@@ -157,14 +172,88 @@ public abstract class IndexedTable extends BaseTable {
       for (int i = 0; i < numAggregationFunctions; i++) {
         columnDataTypes[i + _numKeyColumns] = 
_aggregationFunctions[i].getFinalResultColumnType();
       }
-      for (Record record : _topRecords) {
-        Object[] values = record.getValues();
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          int colId = i + _numKeyColumns;
-          values[colId] = 
_aggregationFunctions[i].extractFinalResult(values[colId]);
+      int numThreadsExtractFinalResult = inferNumThreadsExtractFinalResult();
+      // Submit task when the EXECUTOR_SERVICE is not overloaded
+      if (numThreadsExtractFinalResult > 1) {
+        // Multi-threaded final reduce
+        List<Future<Void>> futures = new 
ArrayList<>(numThreadsExtractFinalResult);
+        try {
+          List<Record> topRecordsList = new ArrayList<>(_topRecords);
+          int chunkSize = (topRecordsList.size() + 
numThreadsExtractFinalResult - 1) / numThreadsExtractFinalResult;
+          for (int threadId = 0; threadId < numThreadsExtractFinalResult; 
threadId++) {
+            int startIdx = threadId * chunkSize;
+            int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());
+            if (startIdx < endIdx) {
+              // Submit a task for processing a chunk of values
+              futures.add(_executorService.submit(new TraceCallable<Void>() {
+                @Override
+                public Void callJob() {
+                  for (int recordIdx = startIdx; recordIdx < endIdx; 
recordIdx++) {
+                    Object[] values = 
topRecordsList.get(recordIdx).getValues();
+                    for (int i = 0; i < numAggregationFunctions; i++) {
+                      int colId = i + _numKeyColumns;
+                      values[colId] = 
_aggregationFunctions[i].extractFinalResult(values[colId]);
+                    }
+                  }
+                  return null;
+                }
+              }));
+            }
+          }
+          // Wait for all tasks to complete
+          for (Future<Void> future : futures) {
+            future.get();
+          }
+        } catch (InterruptedException | ExecutionException e) {
+          // Cancel all running tasks
+          for (Future<Void> future : futures) {
+            future.cancel(true);
+          }
+          throw new RuntimeException("Error during multi-threaded final 
reduce", e);
         }
+      } else {
+        for (Record record : _topRecords) {
+          Object[] values = record.getValues();
+          for (int i = 0; i < numAggregationFunctions; i++) {
+            int colId = i + _numKeyColumns;
+            values[colId] = 
_aggregationFunctions[i].extractFinalResult(values[colId]);
+          }
+        }
+      }
+    }
+  }
+
+  private int inferNumThreadsExtractFinalResult() {
+    if (_numThreadsExtractFinalResult > 1) {
+      return _numThreadsExtractFinalResult;
+    }
+    if (containsExpensiveAggregationFunctions()) {
+      int parallelChunkSize = _chunkSizeExtractFinalResult;
+      if (_topRecords != null && _topRecords.size() > parallelChunkSize) {
+        int estimatedThreads = (int) Math.ceil((double) _topRecords.size() / 
parallelChunkSize);
+        if (estimatedThreads == 0) {
+          return 1;
+        }
+        return Math.min(estimatedThreads, 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
+      }
+    }
+    // Default to 1 thread
+    return 1;
+  }
+
+  private boolean containsExpensiveAggregationFunctions() {
+    for (AggregationFunction aggregationFunction : _aggregationFunctions) {
+      switch (aggregationFunction.getType()) {
+        case FUNNELCOMPLETECOUNT:
+        case FUNNELCOUNT:
+        case FUNNELMATCHSTEP:
+        case FUNNELMAXSTEP:
+          return true;
+        default:
+          break;
       }
     }
+    return false;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index df89c3a8e1..e05f8dea9c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.data.table;
 
 import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -31,8 +32,9 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 public class SimpleIndexedTable extends IndexedTable {
 
   public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput, 
QueryContext queryContext, int resultSize,
-      int trimSize, int trimThreshold, int initialCapacity) {
-    super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, 
trimThreshold, new HashMap<>(initialCapacity));
+      int trimSize, int trimThreshold, int initialCapacity, ExecutorService 
executorService) {
+    super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, 
trimThreshold, new HashMap<>(initialCapacity),
+        executorService);
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
index f397ac0e8c..93ec7a967a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.data.table;
 
+import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
@@ -36,8 +37,9 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {
 
   public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean 
hasFinalInput, QueryContext queryContext,
-      int resultSize, int initialCapacity) {
-    super(dataSchema, hasFinalInput, queryContext, resultSize, 
Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity);
+      int resultSize, int initialCapacity, ExecutorService executorService) {
+    super(dataSchema, hasFinalInput, queryContext, resultSize, 
Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity,
+        executorService);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 633fb7d5e6..3439bc9109 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -111,7 +111,8 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
         if (_indexedTable == null) {
           synchronized (this) {
             if (_indexedTable == null) {
-              _indexedTable = 
GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, 
_numTasks);
+              _indexedTable = 
GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, 
_numTasks,
+                  _executorService);
             }
           }
         }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 82f1549971..82aa1e25d2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -94,6 +94,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   // set as pinot.server.query.executor.groupby.trim.threshold
   public static final String GROUPBY_TRIM_THRESHOLD_KEY = 
"groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+  public static final int DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT = 1;
+  public static final int DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT = 10_000;
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
 
@@ -268,6 +270,21 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
       } else {
         queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
       }
+      // Set numThreadsExtractFinalResult
+      Integer numThreadsExtractFinalResult = 
QueryOptionsUtils.getNumThreadsExtractFinalResult(queryOptions);
+      if (numThreadsExtractFinalResult != null) {
+        
queryContext.setNumThreadsExtractFinalResult(numThreadsExtractFinalResult);
+      } else {
+        
queryContext.setNumThreadsExtractFinalResult(DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT);
+      }
+      // Set chunkSizeExtractFinalResult
+      Integer chunkSizeExtractFinalResult =
+          QueryOptionsUtils.getChunkSizeExtractFinalResult(queryOptions);
+      if (chunkSizeExtractFinalResult != null) {
+        
queryContext.setChunkSizeExtractFinalResult(chunkSizeExtractFinalResult);
+      } else {
+        
queryContext.setChunkSizeExtractFinalResult(DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT);
+      }
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index c53be31ed5..e1db966f1b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -240,7 +240,7 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
     // Create an indexed table to perform the reduce.
     IndexedTable indexedTable =
         GroupByUtils.createIndexedTableForDataTableReducer(dataTables.get(0), 
_queryContext, reducerContext,
-            numReduceThreadsToUse);
+            numReduceThreadsToUse, reducerContext.getExecutorService());
 
     // Create groups of data tables that each thread can process concurrently.
     // Given that numReduceThreads is <= numDataTables, each group will have 
at least one data table.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index e5ce066806..a804d1f3bb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -124,6 +124,11 @@ public class QueryContext {
   private int _minServerGroupTrimSize = 
InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE;
   // Trim threshold to use for server combine for SQL GROUP BY
   private int _groupTrimThreshold = 
InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
+  // Number of threads to use for final reduce
+  private int _numThreadsExtractFinalResult = 
InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT;
+  // Parallel chunk size for final reduce
+  private int _chunkSizeExtractFinalResult =
+      InstancePlanMakerImplV2.DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT;
   // Whether null handling is enabled
   private boolean _nullHandlingEnabled;
   // Whether server returns the final result
@@ -411,6 +416,22 @@ public class QueryContext {
     _groupTrimThreshold = groupTrimThreshold;
   }
 
+  public int getNumThreadsExtractFinalResult() {
+    return _numThreadsExtractFinalResult;
+  }
+
+  public void setNumThreadsExtractFinalResult(int 
numThreadsExtractFinalResult) {
+    _numThreadsExtractFinalResult = numThreadsExtractFinalResult;
+  }
+
+  public int getChunkSizeExtractFinalResult() {
+    return _chunkSizeExtractFinalResult;
+  }
+
+  public void setChunkSizeExtractFinalResult(int chunkSizeExtractFinalResult) {
+    _chunkSizeExtractFinalResult = chunkSizeExtractFinalResult;
+  }
+
   public boolean isNullHandlingEnabled() {
     return _nullHandlingEnabled;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
index ac25d4a31b..5da707f61e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.util;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.HashUtil;
@@ -49,7 +50,7 @@ public final class GroupByUtils {
   /**
    * Returns the capacity of the table required by the given query.
    * NOTE: It returns {@code max(limit * 5, minNumGroups)} where minNumGroups 
is configurable to tune the table size and
-   *       result accuracy.
+   * result accuracy.
    */
   public static int getTableCapacity(int limit, int minNumGroups) {
     long capacityByLimit = limit * 5L;
@@ -93,7 +94,7 @@ public final class GroupByUtils {
    * Creates an indexed table for the combine operator given a sample results 
block.
    */
   public static IndexedTable 
createIndexedTableForCombineOperator(GroupByResultsBlock resultsBlock,
-      QueryContext queryContext, int numThreads) {
+      QueryContext queryContext, int numThreads, ExecutorService 
executorService) {
     DataSchema dataSchema = resultsBlock.getDataSchema();
     int numGroups = resultsBlock.getNumGroups();
     int limit = queryContext.getLimit();
@@ -119,7 +120,8 @@ public final class GroupByUtils {
         resultSize = limit;
       }
       int initialCapacity = getIndexedTableInitialCapacity(resultSize, 
numGroups, minInitialIndexedTableCapacity);
-      return getTrimDisabledIndexedTable(dataSchema, false, queryContext, 
resultSize, initialCapacity, numThreads);
+      return getTrimDisabledIndexedTable(dataSchema, false, queryContext, 
resultSize, initialCapacity, numThreads,
+          executorService);
     }
 
     int resultSize;
@@ -132,10 +134,11 @@ public final class GroupByUtils {
     int trimThreshold = getIndexedTableTrimThreshold(trimSize, 
queryContext.getGroupTrimThreshold());
     int initialCapacity = getIndexedTableInitialCapacity(trimThreshold, 
numGroups, minInitialIndexedTableCapacity);
     if (trimThreshold == Integer.MAX_VALUE) {
-      return getTrimDisabledIndexedTable(dataSchema, false, queryContext, 
resultSize, initialCapacity, numThreads);
+      return getTrimDisabledIndexedTable(dataSchema, false, queryContext, 
resultSize, initialCapacity, numThreads,
+          executorService);
     } else {
       return getTrimEnabledIndexedTable(dataSchema, false, queryContext, 
resultSize, trimSize, trimThreshold,
-          initialCapacity, numThreads);
+          initialCapacity, numThreads, executorService);
     }
   }
 
@@ -143,7 +146,7 @@ public final class GroupByUtils {
    * Creates an indexed table for the data table reducer given a sample data 
table.
    */
   public static IndexedTable createIndexedTableForDataTableReducer(DataTable 
dataTable, QueryContext queryContext,
-      DataTableReducerContext reducerContext, int numThreads) {
+      DataTableReducerContext reducerContext, int numThreads, ExecutorService 
executorService) {
     DataSchema dataSchema = dataTable.getDataSchema();
     int numGroups = dataTable.getNumberOfRows();
     int limit = queryContext.getLimit();
@@ -166,39 +169,41 @@ public final class GroupByUtils {
     if (!hasOrderBy) {
       int initialCapacity = getIndexedTableInitialCapacity(resultSize, 
numGroups, minInitialIndexedTableCapacity);
       return getTrimDisabledIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, initialCapacity,
-          numThreads);
+          numThreads, executorService);
     }
 
     int trimThreshold = getIndexedTableTrimThreshold(trimSize, 
reducerContext.getGroupByTrimThreshold());
     int initialCapacity = getIndexedTableInitialCapacity(trimThreshold, 
numGroups, minInitialIndexedTableCapacity);
     if (trimThreshold == Integer.MAX_VALUE) {
       return getTrimDisabledIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, initialCapacity,
-          numThreads);
+          numThreads, executorService);
     } else {
       return getTrimEnabledIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, trimSize, trimThreshold,
-          initialCapacity, numThreads);
+          initialCapacity, numThreads, executorService);
     }
   }
 
   private static IndexedTable getTrimDisabledIndexedTable(DataSchema 
dataSchema, boolean hasFinalInput,
-      QueryContext queryContext, int resultSize, int initialCapacity, int 
numThreads) {
+      QueryContext queryContext, int resultSize, int initialCapacity, int 
numThreads, ExecutorService executorService) {
     if (numThreads == 1) {
       return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext, 
resultSize, Integer.MAX_VALUE,
-          Integer.MAX_VALUE, initialCapacity);
+          Integer.MAX_VALUE, initialCapacity, executorService);
     } else {
-      return new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, initialCapacity);
+      return new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, initialCapacity,
+          executorService);
     }
   }
 
   private static IndexedTable getTrimEnabledIndexedTable(DataSchema 
dataSchema, boolean hasFinalInput,
-      QueryContext queryContext, int resultSize, int trimSize, int 
trimThreshold, int initialCapacity, int numThreads) {
+      QueryContext queryContext, int resultSize, int trimSize, int 
trimThreshold, int initialCapacity, int numThreads,
+      ExecutorService executorService) {
     assert trimThreshold != Integer.MAX_VALUE;
     if (numThreads == 1) {
       return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext, 
resultSize, trimSize, trimThreshold,
-          initialCapacity);
+          initialCapacity, executorService);
     } else {
       return new ConcurrentIndexedTable(dataSchema, hasFinalInput, 
queryContext, resultSize, trimSize, trimThreshold,
-          initialCapacity);
+          initialCapacity, executorService);
     }
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index e77e644fc3..1ebfaf2248 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -331,7 +332,7 @@ public class ResourceManagerAccountingTest {
     List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, 
NUM_ROWS, 0);
     IndexedTable indexedTable =
         new SimpleIndexedTable(dataSchema, false, queryContext, NUM_ROWS, 
Integer.MAX_VALUE, Integer.MAX_VALUE,
-            
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
+            
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY, 
Executors.newCachedThreadPool());
     for (Object[] row : rows) {
       indexedTable.upsert(new Record(row));
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
index af8d8cf2ff..913ce906f6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
@@ -57,7 +57,8 @@ public class IndexedTableTest {
         ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
     });
     IndexedTable indexedTable =
-        new ConcurrentIndexedTable(dataSchema, false, queryContext, 5, 
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY);
+        new ConcurrentIndexedTable(dataSchema, false, queryContext, 5, 
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY,
+            Executors.newCachedThreadPool());
 
     // 3 threads upsert together
     // a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times 
(20)
@@ -131,18 +132,22 @@ public class IndexedTableTest {
 
     // Test SimpleIndexedTable
     IndexedTable indexedTable =
-        new SimpleIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE, 
TRIM_THRESHOLD, INITIAL_CAPACITY);
+        new SimpleIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE, 
TRIM_THRESHOLD, INITIAL_CAPACITY,
+            Executors.newCachedThreadPool());
     IndexedTable mergeTable =
-        new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, 
TRIM_THRESHOLD, INITIAL_CAPACITY);
+        new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, 
TRIM_THRESHOLD, INITIAL_CAPACITY,
+            Executors.newCachedThreadPool());
     testNonConcurrent(indexedTable, mergeTable);
     indexedTable.finish(true);
     checkSurvivors(indexedTable, survivors);
 
     // Test ConcurrentIndexedTable
     indexedTable =
-        new ConcurrentIndexedTable(dataSchema, false, queryContext, 5, 
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY);
+        new ConcurrentIndexedTable(dataSchema, false, queryContext, 5, 
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY,
+            Executors.newCachedThreadPool());
     mergeTable =
-        new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, 
TRIM_THRESHOLD, INITIAL_CAPACITY);
+        new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE, 
TRIM_THRESHOLD, INITIAL_CAPACITY,
+            Executors.newCachedThreadPool());
     testNonConcurrent(indexedTable, mergeTable);
     indexedTable.finish(true);
     checkSurvivors(indexedTable, survivors);
@@ -260,11 +265,11 @@ public class IndexedTableTest {
 
     IndexedTable indexedTable =
         new SimpleIndexedTable(dataSchema, false, queryContext, 5, 
Integer.MAX_VALUE, Integer.MAX_VALUE,
-            INITIAL_CAPACITY);
+            INITIAL_CAPACITY, Executors.newCachedThreadPool());
     testNoMoreNewRecordsInTable(indexedTable);
 
     indexedTable = new ConcurrentIndexedTable(dataSchema, false, queryContext, 
5, Integer.MAX_VALUE, Integer.MAX_VALUE,
-        INITIAL_CAPACITY);
+        INITIAL_CAPACITY, Executors.newCachedThreadPool());
     testNoMoreNewRecordsInTable(indexedTable);
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
index 78fda6266e..84598ab45b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
@@ -873,7 +873,7 @@ public class ArrayTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -953,6 +953,6 @@ public class ArrayTest extends 
CustomDataQueryClusterIntegrationTest {
         ));
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
index 8e3f18c30d..6c798d1e68 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Base64;
+import java.util.List;
 import java.util.UUID;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -88,7 +89,7 @@ public class BytesTypeTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -153,8 +154,7 @@ public class BytesTypeTest extends 
CustomDataQueryClusterIntegrationTest {
         fileWriter.append(record);
       }
     }
-
-    return avroFile;
+    return List.of(avroFile);
   }
 
   private static String newRandomBase64String() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
index 85bcb59e8e..bf42a3ec5a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Base64;
+import java.util.List;
 import java.util.Random;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -165,7 +166,7 @@ public class CpcSketchTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -188,7 +189,7 @@ public class CpcSketchTest extends 
CustomDataQueryClusterIntegrationTest {
         fileWriter.append(record);
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 
   private byte[] getRandomRawValue() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index 6f15e3be17..2d4569f012 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -94,22 +94,26 @@ public abstract class CustomDataQueryClusterIntegrationTest 
extends BaseClusterI
     Schema schema = createSchema();
     addSchema(schema);
 
-    File avroFile = createAvroFile();
+    List<File> avroFiles = createAvroFiles();
     if (isRealtimeTable()) {
       // create realtime table
-      TableConfig tableConfig = createRealtimeTableConfig(avroFile);
+      TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
       addTableConfig(tableConfig);
 
       // Push data into Kafka
-      pushAvroIntoKafka(List.of(avroFile));
+      pushAvroIntoKafka(avroFiles);
     } else {
       // create offline table
       TableConfig tableConfig = createOfflineTableConfig();
       addTableConfig(tableConfig);
 
       // create & upload segments
-      ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, 
schema, 0, _segmentDir, _tarDir);
-      uploadSegments(getTableName(), _tarDir);
+      int segmentIndex = 0;
+      for (File avroFile : avroFiles) {
+        ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, 
tableConfig, schema, segmentIndex++, _segmentDir,
+            _tarDir);
+        uploadSegments(getTableName(), _tarDir);
+      }
     }
 
     waitForAllDocsLoaded(60_000);
@@ -247,7 +251,7 @@ public abstract class CustomDataQueryClusterIntegrationTest 
extends BaseClusterI
   @Override
   public abstract Schema createSchema();
 
-  public abstract File createAvroFile()
+  public abstract List<File> createAvroFiles()
       throws Exception;
 
   public boolean isRealtimeTable() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
index 6c76127021..7adc80628c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
@@ -72,7 +72,7 @@ public class FloatingPointDataTypeTest extends 
CustomDataQueryClusterIntegration
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws IOException {
 
     // create avro schema
@@ -124,7 +124,7 @@ public class FloatingPointDataTypeTest extends 
CustomDataQueryClusterIntegration
         fileWriter.append(record);
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
index b5cf20019b..6b11a8da3a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.List;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -130,7 +131,7 @@ public class GeoSpatialTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -185,7 +186,7 @@ public class GeoSpatialTest extends 
CustomDataQueryClusterIntegrationTest {
       }
     }
 
-    return avroFile;
+    return List.of(avroFile);
   }
 
   @Test(dataProvider = "useBothQueryEngines")
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
index 7dd460d1f5..d865d7defd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
@@ -96,7 +96,7 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
     List<org.apache.avro.Schema.Field> fields =
@@ -130,7 +130,7 @@ public class JsonPathTest extends 
CustomDataQueryClusterIntegrationTest {
     }
     Collections.sort(_sortedSequenceIds);
 
-    return avroFile;
+    return List.of(avroFile);
   }
 
   @Test(dataProvider = "useBothQueryEngines")
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
index a9cee052b1..8d54850f23 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
@@ -92,7 +92,7 @@ public class MapFieldTypeRealtimeTest extends 
CustomDataQueryClusterIntegrationT
         .build();
   }
 
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
     org.apache.avro.Schema stringMapAvroSchema =
@@ -126,7 +126,7 @@ public class MapFieldTypeRealtimeTest extends 
CustomDataQueryClusterIntegrationT
         fileWriter.append(record);
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 
   protected int getSelectionDefaultDocCount() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
index 74ce24e3d7..e906c5b865 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
@@ -89,7 +89,7 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
         .build();
   }
 
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
     org.apache.avro.Schema stringMapAvroSchema =
@@ -119,7 +119,7 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
         fileWriter.append(record);
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 
   protected int getSelectionDefaultDocCount() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
index 4c5571c9a1..578ee5e5ea 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
@@ -86,7 +86,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
         .build();
   }
 
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
     org.apache.avro.Schema stringKeyMapAvroSchema =
@@ -116,7 +116,7 @@ public class MapTypeTest extends 
CustomDataQueryClusterIntegrationTest {
       }
     }
 
-    return avroFile;
+    return List.of(avroFile);
   }
 
   protected int getSelectionDefaultDocCount() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
index b087913c7c..2677211430 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Random;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -64,7 +65,7 @@ public class SumPrecisionTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws IOException {
 
     // create avro schema
@@ -103,7 +104,7 @@ public class SumPrecisionTest extends 
CustomDataQueryClusterIntegrationTest {
         fileWriter.append(record);
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
index 9d963eab75..353cd00396 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
@@ -132,7 +132,7 @@ public class TextIndicesTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // Read all skills from the skill file
     InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt");
@@ -164,7 +164,7 @@ public class TextIndicesTest extends 
CustomDataQueryClusterIntegrationTest {
         fileWriter.append(record);
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
index e9b577d977..e63e0ad99c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
@@ -87,7 +87,7 @@ public class ThetaSketchTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws IOException {
 
     // create avro schema
@@ -171,7 +171,7 @@ public class ThetaSketchTest extends 
CustomDataQueryClusterIntegrationTest {
       }
     }
 
-    return avroFile;
+    return List.of(avroFile);
   }
 
   @Test(dataProvider = "useV1QueryEngine")
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
index 60e63898f4..483370b247 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.sql.Timestamp;
+import java.util.List;
 import java.util.TimeZone;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -461,7 +462,7 @@ public class TimestampTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -533,6 +534,6 @@ public class TimestampTest extends 
CustomDataQueryClusterIntegrationTest {
         tsBaseLong += 86400000;
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
index e4cd62c302..d39c64db57 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Base64;
+import java.util.List;
 import java.util.Random;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -279,7 +280,7 @@ public class TupleSketchTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -303,7 +304,7 @@ public class TupleSketchTest extends 
CustomDataQueryClusterIntegrationTest {
         fileWriter.append(record);
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 
   private byte[] getRandomRawValue() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
index ccf82c2181..d017a71db8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Base64;
+import java.util.List;
 import java.util.Random;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -123,7 +124,7 @@ public class ULLTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -146,7 +147,7 @@ public class ULLTest extends 
CustomDataQueryClusterIntegrationTest {
         fileWriter.append(record);
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 
   private byte[] getRandomRawValue() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
index 78a078d3d9..da2e03c9fe 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
@@ -252,7 +252,7 @@ public class VectorTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -320,7 +320,7 @@ public class VectorTest extends 
CustomDataQueryClusterIntegrationTest {
         fileWriter.append(record);
       }
     }
-    return avroFile;
+    return List.of(avroFile);
   }
 
   private float[] createZeroVector(int vectorDimSize) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
index d185837c22..a318e5698c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
@@ -21,6 +21,9 @@ package org.apache.pinot.integration.tests.custom;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -212,7 +215,6 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
     }
   }
 
-
   @Test(dataProvider = "useBothQueryEngines")
   public void testFunnelMaxStepGroupByQueriesWithModeKeepAll(boolean 
useMultiStageQueryEngine)
       throws Exception {
@@ -476,6 +478,53 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
     }
   }
 
+  @Test(dataProvider = "useV2QueryEngine", invocationCount = 10, 
threadPoolSize = 5)
+  public void testFunnelMatchStepWithMultiThreadsReduce(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    int numThreadsExtractFinalResult = 2 + new Random().nextInt(10);
+    LOGGER.info("Running testFunnelMatchStepWithMultiThreadsReduce with 
numThreadsExtractFinalResult: {}",
+        numThreadsExtractFinalResult);
+    String query =
+        String.format("SET numThreadsExtractFinalResult=" + 
numThreadsExtractFinalResult + "; "
+            + "SELECT "
+            + "userId, funnelMatchStep(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "'strict_increase' ) "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d ", 
getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < 40; i++) {
+      JsonNode row = rows.get(i);
+      assertEquals(row.size(), 2);
+      assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      int sumSteps = 0;
+      for (JsonNode step : row.get(1)) {
+        sumSteps += step.intValue();
+      }
+      switch (i / 10) {
+        case 0:
+          assertEquals(sumSteps, 4);
+          break;
+        case 1:
+          assertEquals(sumSteps, 2);
+          break;
+        case 2:
+          assertEquals(sumSteps, 3);
+          break;
+        case 3:
+          assertEquals(sumSteps, 1);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
   @Test(dataProvider = "useBothQueryEngines")
   public void testFunnelMatchStepGroupByQueriesWithModeSkipLeaf(boolean 
useMultiStageQueryEngine)
       throws Exception {
@@ -860,7 +909,7 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
   }
 
   @Override
-  public File createAvroFile()
+  public List<File> createAvroFiles()
       throws Exception {
     // create avro schema
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -895,10 +944,11 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
     }
     _countStarResult = totalRows * repeats;
     // create avro file
-    File avroFile = new File(_tempDir, "data.avro");
-    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-      fileWriter.create(avroSchema, avroFile);
-      for (int repeat = 0; repeat < repeats; repeat++) {
+    List<File> avroFiles = new ArrayList<>();
+    for (int repeat = 0; repeat < repeats; repeat++) {
+      File avroFile = new File(_tempDir, "data" + repeat + ".avro");
+      try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+        fileWriter.create(avroSchema, avroFile);
         for (int i = 0; i < userUrlValues.length; i++) {
           for (int j = 0; j < userUrlValues[i].length; j++) {
             GenericData.Record record = new GenericData.Record(avroSchema);
@@ -909,7 +959,8 @@ public class WindowFunnelTest extends 
CustomDataQueryClusterIntegrationTest {
           }
         }
       }
+      avroFiles.add(avroFile);
     }
-    return avroFile;
+    return avroFiles;
   }
 }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
index 579bd5b227..a8fd8cf98d 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
@@ -121,7 +121,7 @@ public class BenchmarkCombineGroupBy {
     IndexedTable concurrentIndexedTable =
         new ConcurrentIndexedTable(_dataSchema, false, _queryContext, 
trimSize, trimSize,
             InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD,
-            
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
+            
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY, 
_executorService);
 
     List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
 
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
index 6c9667533b..7f9be99ea9 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
@@ -119,7 +119,7 @@ public class BenchmarkIndexedTable {
     // make 1 concurrent table
     IndexedTable concurrentIndexedTable =
         new ConcurrentIndexedTable(_dataSchema, false, _queryContext, 
TRIM_SIZE, TRIM_SIZE, TRIM_THRESHOLD,
-            TRIM_THRESHOLD);
+            TRIM_THRESHOLD, _executorService);
 
     // 10 parallel threads putting 10k records into the table
 
@@ -169,7 +169,7 @@ public class BenchmarkIndexedTable {
       // make 10 indexed tables
       IndexedTable simpleIndexedTable =
           new SimpleIndexedTable(_dataSchema, false, _queryContext, TRIM_SIZE, 
TRIM_SIZE, TRIM_THRESHOLD,
-              TRIM_THRESHOLD);
+              TRIM_THRESHOLD, _executorService);
       simpleIndexedTables.add(simpleIndexedTable);
 
       // put 10k records in each indexed table, in parallel
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index e3c3e0d483..30f4b44e27 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -429,12 +429,21 @@ public class CommonConstants {
          * Trimming happens only when (sub)query contains order by clause. */
         public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = 
"minSegmentGroupTrimSize";
 
-        /** Max number of groups GroupByCombineOperator (running at server) 
should return .*/
+        /** Max number of groups GroupByCombineOperator (running at server) 
should return. */
         public static final String MIN_SERVER_GROUP_TRIM_SIZE = 
"minServerGroupTrimSize";
 
         /** Max number of groups GroupByDataTableReducer (running at broker) 
should return. */
         public static final String MIN_BROKER_GROUP_TRIM_SIZE = 
"minBrokerGroupTrimSize";
 
+        /** Number of threads used in the final reduce.
+         * This is useful for expensive aggregation functions. E.g. Funnel 
queries are considered as expensive
+         * aggregation functions. */
+        public static final String NUM_THREADS_EXTRACT_FINAL_RESULT = 
"numThreadsExtractFinalResult";
+
+        /** Number of threads used in the final reduce at broker level. */
+        public static final String CHUNK_SIZE_EXTRACT_FINAL_RESULT =
+            "chunkSizeExtractFinalResult";
+
         public static final String NUM_REPLICA_GROUPS_TO_QUERY = 
"numReplicaGroupsToQuery";
         public static final String USE_FIXED_REPLICA = "useFixedReplica";
         public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to