This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0516d0dae43 simplify IncrementalIndex since group-by v1 has been 
removed (#15448)
0516d0dae43 is described below

commit 0516d0dae432058848806e49e71ac73333bf751c
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Nov 29 14:46:16 2023 -0800

    simplify IncrementalIndex since group-by v1 has been removed (#15448)
---
 .../benchmark/GroupByTypeInterfaceBenchmark.java   |   1 -
 .../IncrementalIndexRowTypeBenchmark.java          |   1 -
 .../druid/benchmark/query/GroupByBenchmark.java    |   1 -
 .../DistinctCountGroupByQueryTest.java             |   1 -
 .../org/apache/druid/indexer/InputRowSerde.java    |   2 +-
 .../incremental/AppendableIndexBuilder.java        |  15 --
 .../segment/incremental/IncrementalIndex.java      | 284 +--------------------
 .../segment/incremental/IncrementalIndexRow.java   |   2 +-
 .../incremental/OnheapIncrementalIndex.java        | 249 +++++++++++++++---
 .../query/aggregation/AggregationTestHelper.java   |   8 -
 .../aggregation/StringColumnAggregationTest.java   |   1 -
 .../query/aggregation/mean/SimpleTestIndex.java    |   1 -
 ...GroupByLimitPushDownInsufficientBufferTest.java |   1 -
 .../GroupByLimitPushDownMultiNodeMergeTest.java    |   1 -
 .../query/groupby/GroupByMultiSegmentTest.java     |   1 -
 .../groupby/GroupByQueryRunnerFactoryTest.java     |   1 -
 .../query/groupby/NestedQueryPushDownTest.java     |   1 -
 .../incremental/IncrementalIndexCreator.java       |   7 +-
 .../segment/incremental/IncrementalIndexTest.java  |   9 +-
 .../OnheapIncrementalIndexBenchmark.java           |  11 +-
 20 files changed, 234 insertions(+), 364 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 2d902c12163..0fe549834f0 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -392,7 +392,6 @@ public class GroupByTypeInterfaceBenchmark
   {
     return new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
-        .setConcurrentEventAdd(true)
         .setMaxRowCount(rowsPerSegment)
         .build();
   }
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
index 9687a888d91..395f9d6d7c7 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
@@ -135,7 +135,6 @@ public class IncrementalIndexRowTypeBenchmark
   {
     return appendableIndexSpec.builder()
         .setSimpleTestingIndexSchema(aggs)
-        .setDeserializeComplexMetrics(false)
         .setMaxRowCount(rowsPerSegment)
         .build();
   }
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index d355dd2d005..efb6bed6cb6 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -608,7 +608,6 @@ public class GroupByBenchmark
                 .withRollup(withRollup)
                 .build()
         )
-        .setConcurrentEventAdd(true)
         .setMaxRowCount(rowsPerSegment)
         .build();
   }
diff --git 
a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
 
b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
index c7946255b07..eaef749c324 100644
--- 
a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
+++ 
b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
@@ -88,7 +88,6 @@ public class DistinctCountGroupByQueryTest extends 
InitializedNullHandlingTest
                 .withMetrics(new CountAggregatorFactory("cnt"))
                 .build()
         )
-        .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
         .build();
 
diff --git 
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java 
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
index 7be4da386d8..22b140222c7 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
@@ -330,7 +330,7 @@ public class InputRowSerde
         writeString(k, out);
 
         try (Aggregator agg = aggFactory.factorize(
-            IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, 
aggFactory, supplier, true)
+            IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, 
aggFactory, supplier)
         )) {
           try {
             agg.aggregate();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index aa739b3f744..777c4cc1fd7 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -29,9 +29,6 @@ public abstract class AppendableIndexBuilder
 {
   @Nullable
   protected IncrementalIndexSchema incrementalIndexSchema = null;
-  protected boolean deserializeComplexMetrics = true;
-  protected boolean concurrentEventAdd = false;
-  protected boolean sortFacts = true;
   protected int maxRowCount = 0;
   protected long maxBytesInMemory = 0;
   // When set to true, for any row that already has metric (with the same name 
defined in metricSpec),
@@ -88,18 +85,6 @@ public abstract class AppendableIndexBuilder
     return this;
   }
 
-  public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean 
deserializeComplexMetrics)
-  {
-    this.deserializeComplexMetrics = deserializeComplexMetrics;
-    return this;
-  }
-
-  public AppendableIndexBuilder setConcurrentEventAdd(final boolean 
concurrentEventAdd)
-  {
-    this.concurrentEventAdd = concurrentEventAdd;
-    return this;
-  }
-
   public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
   {
     this.maxRowCount = maxRowCount;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index db72ed37171..eca175267a7 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -25,7 +25,6 @@ import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
@@ -84,23 +83,15 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Comparator;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Stream;
 
 public abstract class IncrementalIndex implements Iterable<Row>, Closeable, 
ColumnInspector
 {
@@ -109,22 +100,19 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
    *
    * @param agg                       the aggregator
    * @param in                        ingestion-time input row supplier
-   * @param deserializeComplexMetrics whether complex objects should be 
deserialized by a {@link ComplexMetricExtractor}
-   *
    * @return column selector factory
    */
   public static ColumnSelectorFactory makeColumnSelectorFactory(
       final VirtualColumns virtualColumns,
       final AggregatorFactory agg,
-      final Supplier<InputRow> in,
-      final boolean deserializeComplexMetrics
+      final Supplier<InputRow> in
   )
   {
     // we use RowSignature.empty() because ColumnInspector here should be the 
InputRow schema, not the
     // IncrementalIndex schema, because we are reading values from the InputRow
     final RowBasedColumnSelectorFactory<InputRow> baseSelectorFactory = 
RowBasedColumnSelectorFactory.create(
         RowAdapters.standardRow(),
-        in::get,
+        in,
         RowSignature.empty(),
         true,
         true
@@ -135,11 +123,9 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
       @Override
       public ColumnValueSelector<?> makeColumnValueSelector(final String 
column)
       {
-        final boolean isComplexMetric = 
agg.getIntermediateType().is(ValueType.COMPLEX);
-
         final ColumnValueSelector selector = 
baseSelectorFactory.makeColumnValueSelector(column);
 
-        if (!isComplexMetric || !deserializeComplexMetrics) {
+        if (!agg.getIntermediateType().is(ValueType.COMPLEX)) {
           return selector;
         } else {
           // Wrap selector in a special one that uses ComplexMetricSerde to 
modify incoming objects.
@@ -226,7 +212,6 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
   private final List<Function<InputRow, InputRow>> rowTransformers;
   private final VirtualColumns virtualColumns;
   private final AggregatorFactory[] metrics;
-  private final boolean deserializeComplexMetrics;
   private final Metadata metadata;
   protected final boolean preserveExistingMetrics;
 
@@ -252,16 +237,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
 
 
   /**
-   * Setting deserializeComplexMetrics to false is necessary for intermediate 
aggregation such as groupBy that
-   * should not deserialize input columns using ComplexMetricSerde for 
aggregators that return complex metrics.
-   * <p>
-   * Set concurrentEventAdd to true to indicate that adding of input row 
should be thread-safe (for example, groupBy
-   * where the multiple threads can add concurrently to the IncrementalIndex).
-   *
    * @param incrementalIndexSchema    the schema to use for incremental index
-   * @param deserializeComplexMetrics flag whether or not to call 
ComplexMetricExtractor.extractValue() on the input
-   *                                  value for aggregators that return 
metrics other than float.
-   * @param concurrentEventAdd        flag whether ot not adding of input rows 
should be thread-safe
    * @param preserveExistingMetrics   When set to true, for any row that 
already has metric
    *                                  (with the same name defined in 
metricSpec), the metric aggregator in metricSpec
    *                                  is skipped and the existing metric is 
unchanged. If the row does not already have
@@ -273,8 +249,6 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
    */
   protected IncrementalIndex(
       final IncrementalIndexSchema incrementalIndexSchema,
-      final boolean deserializeComplexMetrics,
-      final boolean concurrentEventAdd,
       final boolean preserveExistingMetrics,
       final boolean useMaxMemoryEstimates
   )
@@ -285,7 +259,6 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     this.virtualColumns = incrementalIndexSchema.getVirtualColumns();
     this.metrics = incrementalIndexSchema.getMetrics();
     this.rowTransformers = new CopyOnWriteArrayList<>();
-    this.deserializeComplexMetrics = deserializeComplexMetrics;
     this.preserveExistingMetrics = preserveExistingMetrics;
     this.useMaxMemoryEstimates = useMaxMemoryEstimates;
     this.useSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec()
@@ -303,7 +276,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
         this.rollup
     );
 
-    initAggs(metrics, rowSupplier, deserializeComplexMetrics, 
concurrentEventAdd);
+    initAggs(metrics, rowSupplier);
 
     for (AggregatorFactory metric : metrics) {
       MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric);
@@ -359,9 +332,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
 
   protected abstract void initAggs(
       AggregatorFactory[] metrics,
-      Supplier<InputRow> rowSupplier,
-      boolean deserializeComplexMetrics,
-      boolean concurrentEventAdd
+      Supplier<InputRow> rowSupplier
   );
 
   // Note: This method needs to be thread safe.
@@ -740,11 +711,6 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     return numEntries.get();
   }
 
-  boolean getDeserializeComplexMetrics()
-  {
-    return deserializeComplexMetrics;
-  }
-
   AtomicInteger getNumEntries()
   {
     return numEntries;
@@ -1054,11 +1020,10 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
 
   protected ColumnSelectorFactory makeColumnSelectorFactory(
       final AggregatorFactory agg,
-      final Supplier<InputRow> in,
-      final boolean deserializeComplexMetrics
+      final Supplier<InputRow> in
   )
   {
-    return makeColumnSelectorFactory(virtualColumns, agg, in, 
deserializeComplexMetrics);
+    return makeColumnSelectorFactory(virtualColumns, agg, in);
   }
 
   protected final Comparator<IncrementalIndexRow> dimsComparator()
@@ -1127,7 +1092,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     return true;
   }
 
-  interface FactsHolder
+  public interface FactsHolder
   {
     /**
      * @return the previous rowIndex associated with the specified key, or
@@ -1161,232 +1126,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     void clear();
   }
 
-  static class RollupFactsHolder implements FactsHolder
-  {
-    private final boolean sortFacts;
-    // Can't use Set because we need to be able to get from collection
-    private final ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> 
facts;
-    private final List<DimensionDesc> dimensionDescsList;
-
-    RollupFactsHolder(
-        boolean sortFacts,
-        Comparator<IncrementalIndexRow> incrementalIndexRowComparator,
-        List<DimensionDesc> dimensionDescsList
-    )
-    {
-      this.sortFacts = sortFacts;
-      if (sortFacts) {
-        this.facts = new 
ConcurrentSkipListMap<>(incrementalIndexRowComparator);
-      } else {
-        this.facts = new ConcurrentHashMap<>();
-      }
-      this.dimensionDescsList = dimensionDescsList;
-    }
-
-    @Override
-    public int getPriorIndex(IncrementalIndexRow key)
-    {
-      IncrementalIndexRow row = facts.get(key);
-      return row == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : 
row.getRowIndex();
-    }
-
-    @Override
-    public long getMinTimeMillis()
-    {
-      if (sortFacts) {
-        return ((ConcurrentNavigableMap<IncrementalIndexRow, 
IncrementalIndexRow>) facts).firstKey().getTimestamp();
-      } else {
-        throw new UnsupportedOperationException("can't get minTime from 
unsorted facts data.");
-      }
-    }
-
-    @Override
-    public long getMaxTimeMillis()
-    {
-      if (sortFacts) {
-        return ((ConcurrentNavigableMap<IncrementalIndexRow, 
IncrementalIndexRow>) facts).lastKey().getTimestamp();
-      } else {
-        throw new UnsupportedOperationException("can't get maxTime from 
unsorted facts data.");
-      }
-    }
-
-    @Override
-    public Iterator<IncrementalIndexRow> iterator(boolean descending)
-    {
-      if (descending && sortFacts) {
-        return ((ConcurrentNavigableMap<IncrementalIndexRow, 
IncrementalIndexRow>) facts).descendingMap()
-                                                                               
          .keySet()
-                                                                               
          .iterator();
-      }
-      return keySet().iterator();
-    }
-
-    @Override
-    public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, 
long timeStart, long timeEnd)
-    {
-      if (!sortFacts) {
-        throw new UnsupportedOperationException("can't get timeRange from 
unsorted facts data.");
-      }
-      IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new 
Object[]{}, dimensionDescsList);
-      IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new 
Object[]{}, dimensionDescsList);
-      ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow> subMap =
-          ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) 
facts).subMap(start, end);
-      ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> rangeMap = 
descending ? subMap.descendingMap() : subMap;
-      return rangeMap.keySet();
-    }
-
-    @Override
-    public Iterable<IncrementalIndexRow> keySet()
-    {
-      return facts.keySet();
-    }
-
-    @Override
-    public Iterable<IncrementalIndexRow> persistIterable()
-    {
-      // with rollup, facts are already pre-sorted so just return keyset
-      return keySet();
-    }
-
-    @Override
-    public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
-    {
-      // setRowIndex() must be called before facts.putIfAbsent() for 
visibility of rowIndex from concurrent readers.
-      key.setRowIndex(rowIndex);
-      IncrementalIndexRow prev = facts.putIfAbsent(key, key);
-      return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : 
prev.getRowIndex();
-    }
-
-    @Override
-    public void clear()
-    {
-      facts.clear();
-    }
-  }
-
-  static class PlainFactsHolder implements FactsHolder
-  {
-    private final boolean sortFacts;
-    private final ConcurrentMap<Long, Deque<IncrementalIndexRow>> facts;
-
-    private final Comparator<IncrementalIndexRow> 
incrementalIndexRowComparator;
-
-    public PlainFactsHolder(boolean sortFacts, Comparator<IncrementalIndexRow> 
incrementalIndexRowComparator)
-    {
-      this.sortFacts = sortFacts;
-      if (sortFacts) {
-        this.facts = new ConcurrentSkipListMap<>();
-      } else {
-        this.facts = new ConcurrentHashMap<>();
-      }
-      this.incrementalIndexRowComparator = incrementalIndexRowComparator;
-    }
-
-    @Override
-    public int getPriorIndex(IncrementalIndexRow key)
-    {
-      // always return EMPTY_ROW_INDEX to indicate that no prior key cause we 
always add new row
-      return IncrementalIndexRow.EMPTY_ROW_INDEX;
-    }
-
-    @Override
-    public long getMinTimeMillis()
-    {
-      if (sortFacts) {
-        return ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) 
facts).firstKey();
-      } else {
-        throw new UnsupportedOperationException("can't get minTime from 
unsorted facts data.");
-      }
-    }
-
-    @Override
-    public long getMaxTimeMillis()
-    {
-      if (sortFacts) {
-        return ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) 
facts).lastKey();
-      } else {
-        throw new UnsupportedOperationException("can't get maxTime from 
unsorted facts data.");
-      }
-    }
-
-    @Override
-    public Iterator<IncrementalIndexRow> iterator(boolean descending)
-    {
-      if (descending && sortFacts) {
-        return timeOrderedConcat(((ConcurrentNavigableMap<Long, 
Deque<IncrementalIndexRow>>) facts)
-                                     .descendingMap().values(), 
true).iterator();
-      }
-      return timeOrderedConcat(facts.values(), false).iterator();
-    }
-
-    @Override
-    public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, 
long timeStart, long timeEnd)
-    {
-      ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap =
-          ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) 
facts).subMap(timeStart, timeEnd);
-      final ConcurrentMap<Long, Deque<IncrementalIndexRow>> rangeMap = 
descending ? subMap.descendingMap() : subMap;
-      return timeOrderedConcat(rangeMap.values(), descending);
-    }
-
-    private Iterable<IncrementalIndexRow> timeOrderedConcat(
-        final Iterable<Deque<IncrementalIndexRow>> iterable,
-        final boolean descending
-    )
-    {
-      return () -> Iterators.concat(
-          Iterators.transform(
-              iterable.iterator(),
-              input -> descending ? input.descendingIterator() : 
input.iterator()
-          )
-      );
-    }
-
-    private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
-        final Collection<Deque<IncrementalIndexRow>> rowGroups
-    )
-    {
-      return rowGroups.stream()
-                      .flatMap(Collection::stream)
-                      .sorted(incrementalIndexRowComparator);
-    }
-
-    @Override
-    public Iterable<IncrementalIndexRow> keySet()
-    {
-      return timeOrderedConcat(facts.values(), false);
-    }
-
-    @Override
-    public Iterable<IncrementalIndexRow> persistIterable()
-    {
-      return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
-    }
-
-    @Override
-    public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
-    {
-      Long time = key.getTimestamp();
-      Deque<IncrementalIndexRow> rows = facts.get(time);
-      if (rows == null) {
-        facts.putIfAbsent(time, new ConcurrentLinkedDeque<>());
-        // in race condition, rows may be put by other thread, so always get 
latest status from facts
-        rows = facts.get(time);
-      }
-      // setRowIndex() must be called before rows.add() for visibility of 
rowIndex from concurrent readers.
-      key.setRowIndex(rowIndex);
-      rows.add(key);
-      // always return EMPTY_ROW_INDEX to indicate that we always add new row
-      return IncrementalIndexRow.EMPTY_ROW_INDEX;
-    }
-
-    @Override
-    public void clear()
-    {
-      facts.clear();
-    }
-  }
-
-  private class LongMetricColumnSelector implements LongColumnSelector
+  private final class LongMetricColumnSelector implements LongColumnSelector
   {
     private final IncrementalIndexRowHolder currEntry;
     private final int metricIndex;
@@ -1417,7 +1157,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     }
   }
 
-  private class ObjectMetricColumnSelector extends ObjectColumnSelector
+  private final class ObjectMetricColumnSelector extends ObjectColumnSelector
   {
     private final IncrementalIndexRowHolder currEntry;
     private final int metricIndex;
@@ -1454,7 +1194,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     }
   }
 
-  private class FloatMetricColumnSelector implements FloatColumnSelector
+  private final class FloatMetricColumnSelector implements FloatColumnSelector
   {
     private final IncrementalIndexRowHolder currEntry;
     private final int metricIndex;
@@ -1485,7 +1225,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     }
   }
 
-  private class DoubleMetricColumnSelector implements DoubleColumnSelector
+  private final class DoubleMetricColumnSelector implements 
DoubleColumnSelector
   {
     private final IncrementalIndexRowHolder currEntry;
     private final int metricIndex;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
index 987ee5f8bf7..70a589f3950 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
@@ -42,7 +42,7 @@ public final class IncrementalIndexRow
    * rowIndex is not checked in {@link #equals} and {@link #hashCode} on 
purpose. IncrementalIndexRow acts as a Map key
    * and "entry" object (rowIndex is the "value") at the same time. This is 
done to reduce object indirection and
    * improve locality, and avoid boxing of rowIndex as Integer, when stored in 
JDK collection:
-   * {@link IncrementalIndex.RollupFactsHolder} needs concurrent collections, 
that are not present in fastutil.
+   * {@link OnheapIncrementalIndex.RollupFactsHolder} needs concurrent 
collections, that are not present in fastutil.
    */
   private int rowIndex;
   private long dimsKeySize;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 8e253f8e903..3449226e4ce 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -47,14 +47,23 @@ import org.apache.druid.utils.JvmUtils;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
+import java.util.stream.Stream;
 
 /**
  *
@@ -72,7 +81,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION = 
100;
 
   /**
-   * overhead per {@link ConcurrentHashMap.Node}  or {@link 
java.util.concurrent.ConcurrentSkipListMap.Node} object
+   * overhead per {@link ConcurrentSkipListMap.Node} object in facts table
    */
   private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 + 
Integer.BYTES;
   private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new 
ConcurrentHashMap<>();
@@ -118,28 +127,19 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
 
   OnheapIncrementalIndex(
       IncrementalIndexSchema incrementalIndexSchema,
-      boolean deserializeComplexMetrics,
-      boolean concurrentEventAdd,
-      boolean sortFacts,
       int maxRowCount,
       long maxBytesInMemory,
-      // preserveExistingMetrics should only be set true for DruidInputSource 
since that is the only case where we can have existing metrics
-      // This is currently only use by auto compaction and should not be use 
for anything else.
+      // preserveExistingMetrics should only be set true for DruidInputSource 
since that is the only case where we can
+      // have existing metrics. This is currently only use by auto compaction 
and should not be use for anything else.
       boolean preserveExistingMetrics,
       boolean useMaxMemoryEstimates
   )
   {
-    super(
-        incrementalIndexSchema,
-        deserializeComplexMetrics,
-        concurrentEventAdd,
-        preserveExistingMetrics,
-        useMaxMemoryEstimates
-    );
+    super(incrementalIndexSchema, preserveExistingMetrics, 
useMaxMemoryEstimates);
     this.maxRowCount = maxRowCount;
     this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : 
maxBytesInMemory;
-    this.facts = incrementalIndexSchema.isRollup() ? new 
RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
-                                                   : new 
PlainFactsHolder(sortFacts, dimsComparator());
+    this.facts = incrementalIndexSchema.isRollup() ? new 
RollupFactsHolder(dimsComparator(), getDimensions())
+                                                   : new 
PlainFactsHolder(dimsComparator());
     maxBytesPerRowForAggregators =
         useMaxMemoryEstimates ? 
getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0;
     this.useMaxMemoryEstimates = useMaxMemoryEstimates;
@@ -190,9 +190,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   @Override
   protected void initAggs(
       final AggregatorFactory[] metrics,
-      final Supplier<InputRow> rowSupplier,
-      final boolean deserializeComplexMetrics,
-      final boolean concurrentEventAdd
+      final Supplier<InputRow> rowSupplier
   )
   {
     selectors = new HashMap<>();
@@ -200,18 +198,14 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
     for (AggregatorFactory agg : metrics) {
       selectors.put(
           agg.getName(),
-          new CachingColumnSelectorFactory(
-              makeColumnSelectorFactory(agg, rowSupplier, 
deserializeComplexMetrics),
-              concurrentEventAdd
-          )
+          new CachingColumnSelectorFactory(makeColumnSelectorFactory(agg, 
rowSupplier))
       );
       if (preserveExistingMetrics) {
         AggregatorFactory combiningAgg = agg.getCombiningFactory();
         combiningAggSelectors.put(
             combiningAgg.getName(),
             new CachingColumnSelectorFactory(
-                makeColumnSelectorFactory(combiningAgg, rowSupplier, 
deserializeComplexMetrics),
-                concurrentEventAdd
+                makeColumnSelectorFactory(combiningAgg, rowSupplier)
             )
         );
       }
@@ -550,6 +544,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
    * If preserveExistingMetrics flag is set, then this method will combine 
values from two aggregators, the aggregator
    * for aggregating from input into output field and the aggregator for 
combining already aggregated field, as needed
    */
+  @Nullable
   private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] 
aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
   {
     if (preserveExistingMetrics) {
@@ -605,18 +600,13 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
    */
   static class CachingColumnSelectorFactory implements ColumnSelectorFactory
   {
-    private final Map<String, ColumnValueSelector<?>> columnSelectorMap;
+    private final HashMap<String, ColumnValueSelector<?>> columnSelectorMap;
     private final ColumnSelectorFactory delegate;
 
-    public CachingColumnSelectorFactory(ColumnSelectorFactory delegate, 
boolean concurrentEventAdd)
+    public CachingColumnSelectorFactory(ColumnSelectorFactory delegate)
     {
       this.delegate = delegate;
-
-      if (concurrentEventAdd) {
-        columnSelectorMap = new ConcurrentHashMap<>();
-      } else {
-        columnSelectorMap = new HashMap<>();
-      }
+      this.columnSelectorMap = new HashMap<>();
     }
 
     @Override
@@ -628,7 +618,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     @Override
     public ColumnValueSelector<?> makeColumnValueSelector(String columnName)
     {
-      ColumnValueSelector existing = columnSelectorMap.get(columnName);
+      ColumnValueSelector<?> existing = columnSelectorMap.get(columnName);
       if (existing != null) {
         return existing;
       }
@@ -656,9 +646,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     {
       return new OnheapIncrementalIndex(
           Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema 
is null"),
-          deserializeComplexMetrics,
-          concurrentEventAdd,
-          sortFacts,
           maxRowCount,
           maxBytesInMemory,
           preserveExistingMetrics,
@@ -734,4 +721,194 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
       return Objects.hash(preserveExistingMetrics);
     }
   }
+
+  static final class RollupFactsHolder implements FactsHolder
+  {
+    // Can't use Set because we need to be able to get from collection
+    private final ConcurrentNavigableMap<IncrementalIndexRow, 
IncrementalIndexRow> facts;
+    private final List<DimensionDesc> dimensionDescsList;
+
+    RollupFactsHolder(
+        Comparator<IncrementalIndexRow> incrementalIndexRowComparator,
+        List<DimensionDesc> dimensionDescsList
+    )
+    {
+      this.facts = new ConcurrentSkipListMap<>(incrementalIndexRowComparator);
+      this.dimensionDescsList = dimensionDescsList;
+    }
+
+    @Override
+    public int getPriorIndex(IncrementalIndexRow key)
+    {
+      IncrementalIndexRow row = facts.get(key);
+      return row == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : 
row.getRowIndex();
+    }
+
+    @Override
+    public long getMinTimeMillis()
+    {
+      return facts.firstKey().getTimestamp();
+    }
+
+    @Override
+    public long getMaxTimeMillis()
+    {
+      return facts.lastKey().getTimestamp();
+    }
+
+    @Override
+    public Iterator<IncrementalIndexRow> iterator(boolean descending)
+    {
+      if (descending) {
+        return facts.descendingMap()
+                    .keySet()
+                    .iterator();
+      }
+      return keySet().iterator();
+    }
+
+    @Override
+    public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, 
long timeStart, long timeEnd)
+    {
+      IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new 
Object[]{}, dimensionDescsList);
+      IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new 
Object[]{}, dimensionDescsList);
+      ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow> subMap 
= facts.subMap(start, end);
+      ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> rangeMap = 
descending ? subMap.descendingMap() : subMap;
+      return rangeMap.keySet();
+    }
+
+    @Override
+    public Iterable<IncrementalIndexRow> keySet()
+    {
+      return facts.keySet();
+    }
+
+    @Override
+    public Iterable<IncrementalIndexRow> persistIterable()
+    {
+      // with rollup, facts are already pre-sorted so just return keyset
+      return keySet();
+    }
+
+    @Override
+    public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
+    {
+      // setRowIndex() must be called before facts.putIfAbsent() for 
visibility of rowIndex from concurrent readers.
+      key.setRowIndex(rowIndex);
+      IncrementalIndexRow prev = facts.putIfAbsent(key, key);
+      return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : 
prev.getRowIndex();
+    }
+
+    @Override
+    public void clear()
+    {
+      facts.clear();
+    }
+  }
+
+  static final class PlainFactsHolder implements FactsHolder
+  {
+    private final ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> 
facts;
+
+    private final Comparator<IncrementalIndexRow> 
incrementalIndexRowComparator;
+
+    public PlainFactsHolder(Comparator<IncrementalIndexRow> 
incrementalIndexRowComparator)
+    {
+      this.facts = new ConcurrentSkipListMap<>();
+      this.incrementalIndexRowComparator = incrementalIndexRowComparator;
+    }
+
+    @Override
+    public int getPriorIndex(IncrementalIndexRow key)
+    {
+      // always return EMPTY_ROW_INDEX to indicate that no prior key cause we 
always add new row
+      return IncrementalIndexRow.EMPTY_ROW_INDEX;
+    }
+
+    @Override
+    public long getMinTimeMillis()
+    {
+      return facts.firstKey();
+    }
+
+    @Override
+    public long getMaxTimeMillis()
+    {
+      return facts.lastKey();
+    }
+
+    @Override
+    public Iterator<IncrementalIndexRow> iterator(boolean descending)
+    {
+      if (descending) {
+        return timeOrderedConcat(facts.descendingMap().values(), 
true).iterator();
+      }
+      return timeOrderedConcat(facts.values(), false).iterator();
+    }
+
+    @Override
+    public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, 
long timeStart, long timeEnd)
+    {
+      ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap = 
facts.subMap(timeStart, timeEnd);
+      final ConcurrentMap<Long, Deque<IncrementalIndexRow>> rangeMap = 
descending ? subMap.descendingMap() : subMap;
+      return timeOrderedConcat(rangeMap.values(), descending);
+    }
+
+    private Iterable<IncrementalIndexRow> timeOrderedConcat(
+        final Iterable<Deque<IncrementalIndexRow>> iterable,
+        final boolean descending
+    )
+    {
+      return () -> Iterators.concat(
+          Iterators.transform(
+              iterable.iterator(),
+              input -> descending ? input.descendingIterator() : 
input.iterator()
+          )
+      );
+    }
+
+    private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
+        final Collection<Deque<IncrementalIndexRow>> rowGroups
+    )
+    {
+      return rowGroups.stream()
+                      .flatMap(Collection::stream)
+                      .sorted(incrementalIndexRowComparator);
+    }
+
+    @Override
+    public Iterable<IncrementalIndexRow> keySet()
+    {
+      return timeOrderedConcat(facts.values(), false);
+    }
+
+    @Override
+    public Iterable<IncrementalIndexRow> persistIterable()
+    {
+      return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
+    }
+
+    @Override
+    public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
+    {
+      Long time = key.getTimestamp();
+      Deque<IncrementalIndexRow> rows = facts.get(time);
+      if (rows == null) {
+        facts.putIfAbsent(time, new ConcurrentLinkedDeque<>());
+        // in race condition, rows may be put by other thread, so always get 
latest status from facts
+        rows = facts.get(time);
+      }
+      // setRowIndex() must be called before rows.add() for visibility of 
rowIndex from concurrent readers.
+      key.setRowIndex(rowIndex);
+      rows.add(key);
+      // always return EMPTY_ROW_INDEX to indicate that we always add new row
+      return IncrementalIndexRow.EMPTY_ROW_INDEX;
+    }
+
+    @Override
+    public void clear()
+    {
+      facts.clear();
+    }
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index c60520fbfa4..fab9357b051 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -480,7 +480,6 @@ public class AggregationTestHelper implements Closeable
           outDir,
           minTimestamp,
           gran,
-          true,
           maxRowCount,
           rollup
       );
@@ -498,7 +497,6 @@ public class AggregationTestHelper implements Closeable
       File outDir,
       long minTimestamp,
       Granularity gran,
-      boolean deserializeComplexMetrics,
       int maxRowCount,
       boolean rollup
   ) throws Exception
@@ -517,7 +515,6 @@ public class AggregationTestHelper implements Closeable
                   .withRollup(rollup)
                   .build()
           )
-          .setDeserializeComplexMetrics(deserializeComplexMetrics)
           .setMaxRowCount(maxRowCount)
           .build();
 
@@ -538,7 +535,6 @@ public class AggregationTestHelper implements Closeable
                       .withRollup(rollup)
                       .build()
               )
-              .setDeserializeComplexMetrics(deserializeComplexMetrics)
               .setMaxRowCount(maxRowCount)
               .build();
         }
@@ -594,7 +590,6 @@ public class AggregationTestHelper implements Closeable
       final AggregatorFactory[] metrics,
       long minTimestamp,
       Granularity gran,
-      boolean deserializeComplexMetrics,
       int maxRowCount,
       boolean rollup
   ) throws Exception
@@ -609,7 +604,6 @@ public class AggregationTestHelper implements Closeable
                 .withRollup(rollup)
                 .build()
         )
-        .setDeserializeComplexMetrics(deserializeComplexMetrics)
         .setMaxRowCount(maxRowCount)
         .build();
 
@@ -636,7 +630,6 @@ public class AggregationTestHelper implements Closeable
       final AggregatorFactory[] metrics,
       long minTimestamp,
       Granularity gran,
-      boolean deserializeComplexMetrics,
       int maxRowCount,
       boolean rollup
   ) throws Exception
@@ -648,7 +641,6 @@ public class AggregationTestHelper implements Closeable
         metrics,
         minTimestamp,
         gran,
-        deserializeComplexMetrics,
         maxRowCount,
         rollup
     );
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
index 2e516cebf63..b54a5e5db3b 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
@@ -107,7 +107,6 @@ public class StringColumnAggregationTest
         new AggregatorFactory[]{new CountAggregatorFactory("count")},
         0,
         Granularities.NONE,
-        false,
         100,
         false
     );
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
index 786a6fc176c..1e9bca48216 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
@@ -101,7 +101,6 @@ public class SimpleTestIndex
           },
           0,
           Granularities.NONE,
-          false,
           100,
           false
       );
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index 391a490e2da..2dfe2b7dfbf 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -138,7 +138,6 @@ public class GroupByLimitPushDownInsufficientBufferTest 
extends InitializedNullH
                 .withRollup(withRollup)
                 .build()
         )
-        .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
         .build();
   }
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 8cd451eb009..28ff970efdf 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -154,7 +154,6 @@ public class GroupByLimitPushDownMultiNodeMergeTest
                 .withRollup(withRollup)
                 .build()
         )
-        .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
         .build();
   }
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index 05aa06db0a5..0284edd4127 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -130,7 +130,6 @@ public class GroupByMultiSegmentTest
                 .withRollup(withRollup)
                 .build()
         )
-        .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
         .build();
   }
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
index 3892a2018a0..266691ee294 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
@@ -138,7 +138,6 @@ public class GroupByQueryRunnerFactoryTest
   {
     IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
-        .setConcurrentEventAdd(true)
         .setMaxRowCount(5000)
         .build();
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
index 8bf2785e2b7..06802b55694 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
@@ -132,7 +132,6 @@ public class NestedQueryPushDownTest extends 
InitializedNullHandlingTest
                 ))
                 .build()
         )
-        .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
         .build();
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
index 4f7eb9b8317..d7b51ab0f8b 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
@@ -180,7 +180,7 @@ public class IncrementalIndexCreator implements Closeable
    *
    * For example, for a parameterized test with the following constrctor:
    * {@code
-   *   public IncrementalIndexTest(String indexType, String mode, boolean 
deserializeComplexMetrics)
+   *   public IncrementalIndexTest(String indexType, String mode)
    *   {
    *     ...
    *   }
@@ -188,12 +188,11 @@ public class IncrementalIndexCreator implements Closeable
    *
    * we can test all the input combinations as follows:
    * {@code
-   *   @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
+   *   @Parameterized.Parameters(name = "{index}: {0}, {1}")
    *   public static Collection<?> constructorFeeder()
    *   {
    *     return IncrementalIndexCreator.indexTypeCartesianProduct(
-   *         ImmutableList.of("rollup", "plain"),
-   *         ImmutableList.of(true, false)
+   *         ImmutableList.of("rollup", "plain")
    *     );
    *   }
    * }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
index 0b45026e829..c83c4f0da4c 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
@@ -67,8 +67,7 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
 
   public IncrementalIndexTest(
       String indexType,
-      String mode,
-      boolean deserializeComplexMetrics
+      String mode
   ) throws JsonProcessingException
   {
     this.mode = mode;
@@ -101,18 +100,16 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
         .build();
     indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, 
(builder, args) -> builder
         .setIndexSchema(schema)
-        .setDeserializeComplexMetrics(deserializeComplexMetrics)
         .setMaxRowCount(1_000_000)
         .build())
     );
   }
 
-  @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
+  @Parameterized.Parameters(name = "{index}: {0}, {1}")
   public static Collection<?> constructorFeeder()
   {
     return IncrementalIndexCreator.indexTypeCartesianProduct(
-        ImmutableList.of("rollup", "plain"),
-        ImmutableList.of(true, false)
+        ImmutableList.of("rollup", "plain")
     );
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index 02d43e27506..3d0674d2845 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -110,18 +110,12 @@ public class OnheapIncrementalIndexBenchmark extends 
AbstractBenchmark
 
     public MapIncrementalIndex(
         IncrementalIndexSchema incrementalIndexSchema,
-        boolean deserializeComplexMetrics,
-        boolean concurrentEventAdd,
-        boolean sortFacts,
         int maxRowCount,
         long maxBytesInMemory
     )
     {
       super(
           incrementalIndexSchema,
-          deserializeComplexMetrics,
-          concurrentEventAdd,
-          sortFacts,
           maxRowCount,
           maxBytesInMemory,
           false,
@@ -143,9 +137,6 @@ public class OnheapIncrementalIndexBenchmark extends 
AbstractBenchmark
               .withQueryGranularity(gran)
               .withMetrics(metrics)
               .build(),
-          true,
-          false,
-          true,
           maxRowCount,
           maxBytesInMemory,
           false,
@@ -190,7 +181,7 @@ public class OnheapIncrementalIndexBenchmark extends 
AbstractBenchmark
         for (int i = 0; i < metrics.length; i++) {
           final AggregatorFactory agg = metrics[i];
           aggs[i] = agg.factorize(
-              makeColumnSelectorFactory(agg, rowSupplier, 
getDeserializeComplexMetrics())
+              makeColumnSelectorFactory(agg, rowSupplier)
           );
         }
         Integer rowIndex;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to