This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch 29.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/29.0.0 by this push:
     new de0569bb49f IncrementalIndex#add is no longer thread-safe. (#15697) 
(#15793)
de0569bb49f is described below

commit de0569bb49f87b916e3f793cda13826eb041e750
Author: Laksh Singla <[email protected]>
AuthorDate: Tue Jan 30 21:30:39 2024 +0530

    IncrementalIndex#add is no longer thread-safe. (#15697) (#15793)
    
    * IncrementalIndex#add is no longer thread-safe.
    
    Following #14866, there is no longer a reason for IncrementalIndex#add
    to be thread-safe.
    
    It turns out it already was not using its selectors in a thread-safe way,
    as exposed by #15615 making 
`testMultithreadAddFactsUsingExpressionAndJavaScript`
    in `IncrementalIndexIngestionTest` flaky. Note that this problem isn't
    new: Strings have been stored in the dimension selectors for some time,
    but we didn't have a test that checked for that case; we only have
    this test that checks for concurrent adds involving numeric selectors.
    
    At any rate, this patch changes OnheapIncrementalIndex to no longer try
    to offer a thread-safe "add" method. It also improves performance a bit
    by adding a row ID supplier to the selectors it uses to read InputRows,
    meaning that it can get the benefit of caching values inside the selectors.
    
    This patch also:
    
    1) Adds synchronization to HyperUniquesAggregator and CardinalityAggregator,
       which the similar datasketches versions already have. This is done to
       help them adhere to the contract of Aggregator: concurrent calls to
       "aggregate" and "get" must be thread-safe.
    
    2) Updates OnHeapIncrementalIndexBenchmark to use JMH and moves it to the
       druid-benchmarks module.
    
    * Spelling.
    
    * Changes from static analysis.
    
    * Fix javadoc.
    
    Co-authored-by: Gian Merlino <[email protected]>
---
 .../indexing/OnheapIncrementalIndexBenchmark.java  | 335 ++++++++++++++++
 .../org/apache/druid/indexer/InputRowSerde.java    |   7 +-
 .../cardinality/CardinalityAggregator.java         |   8 +-
 .../hyperloglog/HyperUniquesAggregator.java        |   8 +-
 .../segment/RowBasedColumnSelectorFactory.java     |   4 +-
 .../segment/incremental/IncrementalIndex.java      | 107 ++++--
 .../incremental/OnheapIncrementalIndex.java        | 133 ++++---
 .../druid/segment/data/IncrementalIndexTest.java   |  71 +---
 .../incremental/IncrementalIndexIngestionTest.java | 152 --------
 .../OnheapIncrementalIndexBenchmark.java           | 428 ---------------------
 10 files changed, 502 insertions(+), 751 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/OnheapIncrementalIndexBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/OnheapIncrementalIndexBenchmark.java
new file mode 100644
index 00000000000..7c67e7895ab
--- /dev/null
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/OnheapIncrementalIndexBenchmark.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark.indexing;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.FinalizeResultsQueryRunner;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Benchmark for {@link OnheapIncrementalIndex} doing queries and adds 
simultaneously.
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+public class OnheapIncrementalIndexBenchmark
+{
+  static final int DIMENSION_COUNT = 5;
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  /**
+   * Number of index and query tasks.
+   */
+  private final int taskCount = 30;
+
+  /**
+   * Number of elements to add for each index task.
+   */
+  private final int elementsPerAddTask = 1 << 15;
+
+  /**
+   * Number of query tasks to run simultaneously.
+   */
+  private final int queryThreads = 4;
+
+  private AggregatorFactory[] factories;
+  private IncrementalIndex incrementalIndex;
+  private ListeningExecutorService indexExecutor;
+  private ListeningExecutorService queryExecutor;
+
+  private static MapBasedInputRow getLongRow(long timestamp, int rowID, int 
dimensionCount)
+  {
+    List<String> dimensionList = new ArrayList<String>(dimensionCount);
+    ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+    for (int i = 0; i < dimensionCount; i++) {
+      String dimName = StringUtils.format("Dim_%d", i);
+      dimensionList.add(dimName);
+      builder.put(dimName, Integer.valueOf(rowID).longValue());
+    }
+    return new MapBasedInputRow(timestamp, dimensionList, builder.build());
+  }
+
+  @Setup(Level.Trial)
+  public void setupFactories()
+  {
+    final ArrayList<AggregatorFactory> ingestAggregatorFactories = new 
ArrayList<>(DIMENSION_COUNT + 1);
+    ingestAggregatorFactories.add(new CountAggregatorFactory("rows"));
+    for (int i = 0; i < DIMENSION_COUNT; ++i) {
+      ingestAggregatorFactories.add(
+          new LongSumAggregatorFactory(
+              StringUtils.format("sumResult%s", i),
+              StringUtils.format("Dim_%s", i)
+          )
+      );
+      ingestAggregatorFactories.add(
+          new DoubleSumAggregatorFactory(
+              StringUtils.format("doubleSumResult%s", i),
+              StringUtils.format("Dim_%s", i)
+          )
+      );
+    }
+    factories = ingestAggregatorFactories.toArray(new AggregatorFactory[0]);
+  }
+
+  @Setup(Level.Trial)
+  public void setupExecutors()
+  {
+    indexExecutor = MoreExecutors.listeningDecorator(
+        Executors.newSingleThreadExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(false)
+                .setNameFormat("index-executor-%d")
+                .setPriority(Thread.MIN_PRIORITY)
+                .build()
+        )
+    );
+    queryExecutor = MoreExecutors.listeningDecorator(
+        Executors.newFixedThreadPool(
+            queryThreads,
+            new ThreadFactoryBuilder()
+                .setDaemon(false)
+                .setNameFormat("query-executor-%d")
+                .build()
+        )
+    );
+  }
+
+  @Setup(Level.Invocation)
+  public void setupIndex()
+      throws NoSuchMethodException, InvocationTargetException, 
InstantiationException, IllegalAccessException
+  {
+    final Constructor<? extends OnheapIncrementalIndex> constructor =
+        OnheapIncrementalIndex.class.getDeclaredConstructor(
+            IncrementalIndexSchema.class,
+            int.class,
+            long.class,
+            boolean.class,
+            boolean.class
+        );
+
+    constructor.setAccessible(true);
+
+    this.incrementalIndex =
+        constructor.newInstance(
+            new 
IncrementalIndexSchema.Builder().withMetrics(factories).build(),
+            elementsPerAddTask * taskCount,
+            1_000_000_000L,
+            false,
+            false
+        );
+  }
+
+  @TearDown(Level.Invocation)
+  public void tearDownIndex()
+  {
+    incrementalIndex.close();
+    incrementalIndex = null;
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDownExecutors() throws InterruptedException
+  {
+    indexExecutor.shutdown();
+    queryExecutor.shutdown();
+    if (!indexExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+      throw new ISE("Could not shut down indexExecutor");
+    }
+    if (!queryExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+      throw new ISE("Could not shut down queryExecutor");
+    }
+    indexExecutor = null;
+    queryExecutor = null;
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void concurrentAddRead() throws InterruptedException, 
ExecutionException
+  {
+    final ArrayList<AggregatorFactory> queryAggregatorFactories = new 
ArrayList<>(DIMENSION_COUNT + 1);
+    queryAggregatorFactories.add(new CountAggregatorFactory("rows"));
+    for (int i = 0; i < DIMENSION_COUNT; ++i) {
+      queryAggregatorFactories.add(
+          new LongSumAggregatorFactory(
+              StringUtils.format("sumResult%s", i),
+              StringUtils.format("sumResult%s", i)
+          )
+      );
+      queryAggregatorFactories.add(
+          new DoubleSumAggregatorFactory(
+              StringUtils.format("doubleSumResult%s", i),
+              StringUtils.format("doubleSumResult%s", i)
+          )
+      );
+    }
+
+    final long timestamp = System.currentTimeMillis();
+    final Interval queryInterval = 
Intervals.of("1900-01-01T00:00:00Z/2900-01-01T00:00:00Z");
+    final List<ListenableFuture<?>> indexFutures = new ArrayList<>();
+    final List<ListenableFuture<?>> queryFutures = new ArrayList<>();
+    final Segment incrementalIndexSegment = new 
IncrementalIndexSegment(incrementalIndex, null);
+    final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
+        new TimeseriesQueryQueryToolChest(),
+        new TimeseriesQueryEngine(),
+        QueryRunnerTestHelper.NOOP_QUERYWATCHER
+    );
+    final AtomicInteger currentlyRunning = new AtomicInteger(0);
+    final AtomicBoolean concurrentlyRan = new AtomicBoolean(false);
+    final AtomicBoolean someoneRan = new AtomicBoolean(false);
+    for (int j = 0; j < taskCount; j++) {
+      indexFutures.add(
+          indexExecutor.submit(
+              () -> {
+                currentlyRunning.incrementAndGet();
+                try {
+                  for (int i = 0; i < elementsPerAddTask; i++) {
+                    incrementalIndex.add(getLongRow(timestamp + i, 1, 
DIMENSION_COUNT));
+                  }
+                }
+                catch (IndexSizeExceededException e) {
+                  throw new RuntimeException(e);
+                }
+                currentlyRunning.decrementAndGet();
+                someoneRan.set(true);
+              }
+          )
+      );
+
+      queryFutures.add(
+          queryExecutor.submit(
+              () -> {
+                QueryRunner<Result<TimeseriesResultValue>> runner =
+                    new 
FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
+                        factory.createRunner(incrementalIndexSegment),
+                        factory.getToolchest()
+                    );
+                TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                              .dataSource("xxx")
+                                              .granularity(Granularities.ALL)
+                                              
.intervals(ImmutableList.of(queryInterval))
+                                              
.aggregators(queryAggregatorFactories)
+                                              .build();
+                List<Result<TimeseriesResultValue>> results = 
runner.run(QueryPlus.wrap(query)).toList();
+                for (Result<TimeseriesResultValue> result : results) {
+                  if (someoneRan.get()) {
+                    
Assert.assertTrue(result.getValue().getDoubleMetric("doubleSumResult0") > 0);
+                  }
+                }
+                if (currentlyRunning.get() > 0) {
+                  concurrentlyRan.set(true);
+                }
+              }
+          )
+      );
+
+    }
+    List<ListenableFuture<?>> allFutures = new ArrayList<>(queryFutures.size() 
+ indexFutures.size());
+    allFutures.addAll(queryFutures);
+    allFutures.addAll(indexFutures);
+    Futures.allAsList(allFutures).get();
+    QueryRunner<Result<TimeseriesResultValue>> runner = new 
FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
+        factory.createRunner(incrementalIndexSegment),
+        factory.getToolchest()
+    );
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource("xxx")
+                                  .granularity(Granularities.ALL)
+                                  .intervals(ImmutableList.of(queryInterval))
+                                  .aggregators(queryAggregatorFactories)
+                                  .build();
+    List<Result<TimeseriesResultValue>> results = 
runner.run(QueryPlus.wrap(query)).toList();
+    final int expectedVal = elementsPerAddTask * taskCount;
+    for (Result<TimeseriesResultValue> result : results) {
+      Assert.assertEquals(elementsPerAddTask, 
result.getValue().getLongMetric("rows").intValue());
+      for (int i = 0; i < DIMENSION_COUNT; ++i) {
+        Assert.assertEquals(
+            StringUtils.format("Failed long sum on dimension %d", i),
+            expectedVal,
+            result.getValue().getLongMetric(StringUtils.format("sumResult%s", 
i)).intValue()
+        );
+        Assert.assertEquals(
+            StringUtils.format("Failed double sum on dimension %d", i),
+            expectedVal,
+            
result.getValue().getDoubleMetric(StringUtils.format("doubleSumResult%s", 
i)).intValue()
+        );
+      }
+    }
+  }
+}
diff --git 
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java 
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
index 22b140222c7..40339d4b6df 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
@@ -293,7 +293,7 @@ public class InputRowSerde
   public static SerializeResult toBytes(
       final Map<String, IndexSerdeTypeHelper> typeHelperMap,
       final InputRow row,
-      AggregatorFactory[] aggs
+      final AggregatorFactory[] aggs
   )
   {
     try {
@@ -323,14 +323,15 @@ public class InputRowSerde
       }
 
       //writing all metrics
-      Supplier<InputRow> supplier = () -> row;
       WritableUtils.writeVInt(out, aggs.length);
       for (AggregatorFactory aggFactory : aggs) {
         String k = aggFactory.getName();
         writeString(k, out);
 
+        final IncrementalIndex.InputRowHolder holder = new 
IncrementalIndex.InputRowHolder();
+        holder.set(row);
         try (Aggregator agg = aggFactory.factorize(
-            IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, 
aggFactory, supplier)
+            IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, 
holder, aggFactory)
         )) {
           try {
             agg.aggregate();
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregator.java
index be665a3ce78..6df50bedf01 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregator.java
@@ -83,7 +83,7 @@ public class CardinalityAggregator implements Aggregator
   }
 
   @Override
-  public void aggregate()
+  public synchronized void aggregate()
   {
     if (byRow) {
       hashRow(selectorPluses, collector);
@@ -93,10 +93,10 @@ public class CardinalityAggregator implements Aggregator
   }
 
   @Override
-  public Object get()
+  public synchronized Object get()
   {
-    // Workaround for non-thread-safe use of HyperLogLogCollector.
-    // OnheapIncrementalIndex has a penchant for calling "aggregate" and "get" 
simultaneously.
+    // Must make a new collector duplicating the underlying buffer to ensure 
the object from "get" is usable
+    // in a thread-safe manner.
     return HyperLogLogCollector.makeCollectorSharingStorage(collector);
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java
index d4fba9dff87..ba850efe3cf 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java
@@ -39,7 +39,7 @@ public class HyperUniquesAggregator implements Aggregator
   }
 
   @Override
-  public void aggregate()
+  public synchronized void aggregate()
   {
     Object object = selector.getObject();
     if (object == null) {
@@ -53,13 +53,13 @@ public class HyperUniquesAggregator implements Aggregator
 
   @Nullable
   @Override
-  public Object get()
+  public synchronized Object get()
   {
     if (collector == null) {
       return null;
     }
-    // Workaround for non-thread-safe use of HyperLogLogCollector.
-    // OnheapIncrementalIndex has a penchant for calling "aggregate" and "get" 
simultaneously.
+    // Must make a new collector duplicating the underlying buffer to ensure 
the object from "get" is usable
+    // in a thread-safe manner.
     return HyperLogLogCollector.makeCollectorSharingStorage(collector);
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java
index 17e6e5daa7d..43ae6ae1464 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java
@@ -61,10 +61,10 @@ public class RowBasedColumnSelectorFactory<T> implements 
ColumnSelectorFactory
   private final boolean useStringValueOfNullInLists;
 
   /**
-   * Package-private constructor for {@link RowBasedCursor}. Allows passing in 
a rowIdSupplier, which enables
+   * Full constructor for {@link RowBasedCursor}. Allows passing in a 
rowIdSupplier, which enables
    * column value reuse optimizations.
    */
-  RowBasedColumnSelectorFactory(
+  public RowBasedColumnSelectorFactory(
       final Supplier<T> rowSupplier,
       @Nullable final RowIdSupplier rowIdSupplier,
       final RowAdapter<T> adapter,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index eca175267a7..6d00737d4ca 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -21,8 +21,8 @@ package org.apache.druid.segment.incremental;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -93,26 +93,40 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * In-memory, row-based data structure used to hold data during ingestion. 
Realtime tasks query this index using
+ * {@link IncrementalIndexStorageAdapter}.
+ *
+ * Concurrency model: {@link #add(InputRow)} and {@link #add(InputRow, 
boolean)} are not thread-safe, and must be
+ * called from a single thread or externally synchronized. However, the 
methods that support
+ * {@link IncrementalIndexStorageAdapter} are thread-safe, and may be called 
concurrently with each other, and with
+ * the "add" methods. This concurrency model supports real-time queries of the 
data in the index.
+ */
 public abstract class IncrementalIndex implements Iterable<Row>, Closeable, 
ColumnInspector
 {
   /**
    * Column selector used at ingestion time for inputs to aggregators.
    *
-   * @param agg                       the aggregator
-   * @param in                        ingestion-time input row supplier
+   * @param virtualColumns virtual columns
+   * @param inputRowHolder ingestion-time input row holder
+   * @param agg            the aggregator, or null to make a generic 
aggregator. Only required if the agg has
+   *                       {@link AggregatorFactory#getIntermediateType()} as 
{@link ValueType#COMPLEX}, because
+   *                       in this case we need to do some magic to ensure the 
correct values show up.
+   *
    * @return column selector factory
    */
   public static ColumnSelectorFactory makeColumnSelectorFactory(
       final VirtualColumns virtualColumns,
-      final AggregatorFactory agg,
-      final Supplier<InputRow> in
+      final InputRowHolder inputRowHolder,
+      @Nullable final AggregatorFactory agg
   )
   {
     // we use RowSignature.empty() because ColumnInspector here should be the 
InputRow schema, not the
     // IncrementalIndex schema, because we are reading values from the InputRow
-    final RowBasedColumnSelectorFactory<InputRow> baseSelectorFactory = 
RowBasedColumnSelectorFactory.create(
+    final RowBasedColumnSelectorFactory<InputRow> baseSelectorFactory = new 
RowBasedColumnSelectorFactory<>(
+        inputRowHolder::getRow,
+        inputRowHolder::getRowId,
         RowAdapters.standardRow(),
-        in,
         RowSignature.empty(),
         true,
         true
@@ -125,7 +139,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
       {
         final ColumnValueSelector selector = 
baseSelectorFactory.makeColumnValueSelector(column);
 
-        if (!agg.getIntermediateType().is(ValueType.COMPLEX)) {
+        if (agg == null || !agg.getIntermediateType().is(ValueType.COMPLEX)) {
           return selector;
         } else {
           // Wrap selector in a special one that uses ComplexMetricSerde to 
modify incoming objects.
@@ -175,13 +189,13 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
             public Object getObject()
             {
               // Here is where the magic happens: read from "in" directly, 
don't go through the normal "selector".
-              return extractor.extractValue(in.get(), column, agg);
+              return extractor.extractValue(inputRowHolder.getRow(), column, 
agg);
             }
 
             @Override
             public void inspectRuntimeShape(RuntimeShapeInspector inspector)
             {
-              inspector.visit("in", in);
+              inspector.visit("inputRowHolder", inputRowHolder);
               inspector.visit("selector", selector);
               inspector.visit("extractor", extractor);
             }
@@ -229,13 +243,10 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
 
   private final boolean useSchemaDiscovery;
 
-  // This is modified on add() in a critical section.
-  private final ThreadLocal<InputRow> in = new ThreadLocal<>();
-  private final Supplier<InputRow> rowSupplier = in::get;
+  private final InputRowHolder inputRowHolder = new InputRowHolder();
 
   private volatile DateTime maxIngestedEventTime;
 
-
   /**
    * @param incrementalIndexSchema    the schema to use for incremental index
    * @param preserveExistingMetrics   When set to true, for any row that 
already has metric
@@ -276,7 +287,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
         this.rollup
     );
 
-    initAggs(metrics, rowSupplier);
+    initAggs(metrics, inputRowHolder);
 
     for (AggregatorFactory metric : metrics) {
       MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric);
@@ -332,15 +343,13 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
 
   protected abstract void initAggs(
       AggregatorFactory[] metrics,
-      Supplier<InputRow> rowSupplier
+      InputRowHolder rowSupplier
   );
 
-  // Note: This method needs to be thread safe.
+  // Note: This method does not need to be thread safe.
   protected abstract AddToFactsResult addToFacts(
-      InputRow row,
       IncrementalIndexRow key,
-      ThreadLocal<InputRow> rowContainer,
-      Supplier<InputRow> rowSupplier,
+      InputRowHolder inputRowHolder,
       boolean skipMaxRowsInMemoryCheck
   ) throws IndexSizeExceededException;
 
@@ -411,6 +420,34 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     }
   }
 
+  public static class InputRowHolder
+  {
+    @Nullable
+    private InputRow row;
+    private long rowId = -1;
+
+    public void set(final InputRow row)
+    {
+      this.row = row;
+      this.rowId++;
+    }
+
+    public void unset()
+    {
+      this.row = null;
+    }
+
+    public InputRow getRow()
+    {
+      return Preconditions.checkNotNull(row, "row");
+    }
+
+    public long getRowId()
+    {
+      return rowId;
+    }
+  }
+
   public boolean isRollup()
   {
     return rollup;
@@ -473,14 +510,14 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
   /**
    * Adds a new row.  The row might correspond with another row that already 
exists, in which case this will
    * update that row instead of inserting a new one.
-   * <p>
-   * <p>
-   * Calls to add() are thread safe.
-   * <p>
+   *
+   * Not thread-safe.
    *
    * @param row the row of data to add
    *
    * @return the number of rows in the data set after adding the InputRow. If 
any parse failure occurs, a {@link ParseException} is returned in {@link 
IncrementalIndexAddResult}.
+   *
+   * @throws IndexSizeExceededException this exception is thrown once it 
reaches max rows limit and skipMaxRowsInMemoryCheck is set to false.
    */
   public IncrementalIndexAddResult add(InputRow row) throws 
IndexSizeExceededException
   {
@@ -490,25 +527,24 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
   /**
    * Adds a new row.  The row might correspond with another row that already 
exists, in which case this will
    * update that row instead of inserting a new one.
-   * <p>
-   * <p>
-   * Calls to add() are thread safe.
-   * <p>
+   *
+   * Not thread-safe.
    *
    * @param row                      the row of data to add
-   * @param skipMaxRowsInMemoryCheck whether or not to skip the check of rows 
exceeding the max rows limit
+   * @param skipMaxRowsInMemoryCheck whether or not to skip the check of rows 
exceeding the max rows or bytes limit
+   *
    * @return the number of rows in the data set after adding the InputRow. If 
any parse failure occurs, a {@link ParseException} is returned in {@link 
IncrementalIndexAddResult}.
+   *
    * @throws IndexSizeExceededException this exception is thrown once it 
reaches max rows limit and skipMaxRowsInMemoryCheck is set to false.
    */
   public IncrementalIndexAddResult add(InputRow row, boolean 
skipMaxRowsInMemoryCheck)
       throws IndexSizeExceededException
   {
     IncrementalIndexRowResult incrementalIndexRowResult = 
toIncrementalIndexRow(row);
+    inputRowHolder.set(row);
     final AddToFactsResult addToFactsResult = addToFacts(
-        row,
         incrementalIndexRowResult.getIncrementalIndexRow(),
-        in,
-        rowSupplier,
+        inputRowHolder,
         skipMaxRowsInMemoryCheck
     );
     updateMaxIngestedTime(row.getTimestamp());
@@ -517,6 +553,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
         incrementalIndexRowResult.getParseExceptionMessages(),
         addToFactsResult.getParseExceptionMessages()
     );
+    inputRowHolder.unset();
     return new IncrementalIndexAddResult(
         addToFactsResult.getRowCount(),
         addToFactsResult.getBytesInMemory(),
@@ -1019,11 +1056,11 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
   }
 
   protected ColumnSelectorFactory makeColumnSelectorFactory(
-      final AggregatorFactory agg,
-      final Supplier<InputRow> in
+      @Nullable final AggregatorFactory agg,
+      final InputRowHolder in
   )
   {
-    return makeColumnSelectorFactory(virtualColumns, agg, in);
+    return makeColumnSelectorFactory(virtualColumns, in, agg);
   }
 
   protected final Comparator<IncrementalIndexRow> dimsComparator()
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 3449226e4ce..0103dc7b729 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -21,12 +21,11 @@ package org.apache.druid.segment.incremental;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Supplier;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
-import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedRow;
 import org.apache.druid.data.input.Row;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -42,6 +41,7 @@ import org.apache.druid.segment.DimensionHandler;
 import org.apache.druid.segment.DimensionIndexer;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.utils.JvmUtils;
 
 import javax.annotation.Nullable;
@@ -118,10 +118,17 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
    */
   private final boolean useMaxMemoryEstimates;
 
+  /**
+   * Aggregator name -> column selector factory for that aggregator.
+   */
   @Nullable
-  private volatile Map<String, ColumnSelectorFactory> selectors;
+  private Map<String, ColumnSelectorFactory> selectors;
+  /**
+   * Aggregator name -> column selector factory for the combining version of 
that aggregator. Only set when
+   * {@link #preserveExistingMetrics} is true.
+   */
   @Nullable
-  private volatile Map<String, ColumnSelectorFactory> combiningAggSelectors;
+  private Map<String, ColumnSelectorFactory> combiningAggSelectors;
   @Nullable
   private String outOfRowsReason = null;
 
@@ -190,34 +197,49 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
   @Override
   protected void initAggs(
       final AggregatorFactory[] metrics,
-      final Supplier<InputRow> rowSupplier
+      final InputRowHolder inputRowHolder
   )
   {
+    // All non-complex aggregators share a column selector factory. Helps with 
value reuse.
+    ColumnSelectorFactory nonComplexColumnSelectorFactory = null;
     selectors = new HashMap<>();
     combiningAggSelectors = new HashMap<>();
     for (AggregatorFactory agg : metrics) {
-      selectors.put(
-          agg.getName(),
-          new CachingColumnSelectorFactory(makeColumnSelectorFactory(agg, 
rowSupplier))
-      );
-      if (preserveExistingMetrics) {
-        AggregatorFactory combiningAgg = agg.getCombiningFactory();
-        combiningAggSelectors.put(
-            combiningAgg.getName(),
-            new CachingColumnSelectorFactory(
-                makeColumnSelectorFactory(combiningAgg, rowSupplier)
-            )
-        );
+      final ColumnSelectorFactory factory;
+      if (agg.getIntermediateType().is(ValueType.COMPLEX)) {
+        factory = new 
CachingColumnSelectorFactory(makeColumnSelectorFactory(agg, inputRowHolder));
+      } else {
+        if (nonComplexColumnSelectorFactory == null) {
+          nonComplexColumnSelectorFactory =
+              new CachingColumnSelectorFactory(makeColumnSelectorFactory(null, 
inputRowHolder));
+        }
+        factory = nonComplexColumnSelectorFactory;
+      }
+      selectors.put(agg.getName(), factory);
+    }
+
+    if (preserveExistingMetrics) {
+      for (AggregatorFactory agg : metrics) {
+        final AggregatorFactory combiningAgg = agg.getCombiningFactory();
+        final ColumnSelectorFactory factory;
+        if (combiningAgg.getIntermediateType().is(ValueType.COMPLEX)) {
+          factory = new 
CachingColumnSelectorFactory(makeColumnSelectorFactory(combiningAgg, 
inputRowHolder));
+        } else {
+          if (nonComplexColumnSelectorFactory == null) {
+            nonComplexColumnSelectorFactory =
+                new 
CachingColumnSelectorFactory(makeColumnSelectorFactory(null, inputRowHolder));
+          }
+          factory = nonComplexColumnSelectorFactory;
+        }
+        combiningAggSelectors.put(combiningAgg.getName(), factory);
       }
     }
   }
 
   @Override
   protected AddToFactsResult addToFacts(
-      InputRow row,
       IncrementalIndexRow key,
-      ThreadLocal<InputRow> rowContainer,
-      Supplier<InputRow> rowSupplier,
+      InputRowHolder inputRowHolder,
       boolean skipMaxRowsInMemoryCheck
   ) throws IndexSizeExceededException
   {
@@ -230,7 +252,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     final AtomicLong totalSizeInBytes = getBytesInMemory();
     if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
       aggs = concurrentGet(priorIndex);
-      long aggSizeDelta = doAggregate(metrics, aggs, rowContainer, row, 
parseExceptionMessages);
+      long aggSizeDelta = doAggregate(metrics, aggs, inputRowHolder, 
parseExceptionMessages);
       totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta);
     } else {
       if (preserveExistingMetrics) {
@@ -238,8 +260,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex
       } else {
         aggs = new Aggregator[metrics.length];
       }
-      long aggSizeForRow = factorizeAggs(metrics, aggs, rowContainer, row);
-      aggSizeForRow += doAggregate(metrics, aggs, rowContainer, row, 
parseExceptionMessages);
+      long aggSizeForRow = factorizeAggs(metrics, aggs);
+      aggSizeForRow += doAggregate(metrics, aggs, inputRowHolder, 
parseExceptionMessages);
 
       final int rowIndex = indexIncrement.getAndIncrement();
       concurrentSet(rowIndex, aggs);
@@ -258,15 +280,7 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
       if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
         numEntries.incrementAndGet();
       } else {
-        // this should never happen. Previously, this would happen in a race 
condition involving multiple write threads
-        // for GroupBy v1 strategy, but it is no more, so this code needs the 
concurrency model reworked in the future
-        parseExceptionMessages.clear();
-        aggs = concurrentGet(prev);
-        aggSizeForRow = doAggregate(metrics, aggs, rowContainer, row, 
parseExceptionMessages);
-
-        // Free up the misfire
-        concurrentRemove(rowIndex);
-        // This is expected to occur ~80% of the time in the worst scenarios
+        throw DruidException.defensive("Encountered existing fact entry for 
new key, possible concurrent add?");
       }
 
       // For a new key, row size = key size + aggregator size + overhead
@@ -295,13 +309,10 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
    */
   private long factorizeAggs(
       AggregatorFactory[] metrics,
-      Aggregator[] aggs,
-      ThreadLocal<InputRow> rowContainer,
-      InputRow row
+      Aggregator[] aggs
   )
   {
     long totalInitialSizeBytes = 0L;
-    rowContainer.set(row);
     final long aggReferenceSize = Long.BYTES;
     for (int i = 0; i < metrics.length; i++) {
       final AggregatorFactory agg = metrics[i];
@@ -328,7 +339,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
         }
       }
     }
-    rowContainer.set(null);
     return totalInitialSizeBytes;
   }
 
@@ -342,42 +352,44 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
   private long doAggregate(
       AggregatorFactory[] metrics,
       Aggregator[] aggs,
-      ThreadLocal<InputRow> rowContainer,
-      InputRow row,
+      InputRowHolder inputRowHolder,
       List<String> parseExceptionsHolder
   )
   {
-    rowContainer.set(row);
     long totalIncrementalBytes = 0L;
     for (int i = 0; i < metrics.length; i++) {
       final Aggregator agg;
-      if (preserveExistingMetrics && row instanceof MapBasedRow && 
((MapBasedRow) row).getEvent().containsKey(metrics[i].getName())) {
+      if (preserveExistingMetrics
+          && inputRowHolder.getRow() instanceof MapBasedRow
+          && ((MapBasedRow) 
inputRowHolder.getRow()).getEvent().containsKey(metrics[i].getName())) {
         agg = aggs[i + metrics.length];
       } else {
         agg = aggs[i];
       }
-      synchronized (agg) {
-        try {
-          if (useMaxMemoryEstimates) {
-            agg.aggregate();
-          } else {
-            totalIncrementalBytes += agg.aggregateWithSize();
-          }
+      try {
+        if (useMaxMemoryEstimates) {
+          agg.aggregate();
+        } else {
+          totalIncrementalBytes += agg.aggregateWithSize();
         }
-        catch (ParseException e) {
-          // "aggregate" can throw ParseExceptions if a selector expects 
something but gets something else.
-          if (preserveExistingMetrics) {
-            log.warn(e, "Failing ingestion as preserveExistingMetrics is 
enabled but selector of aggregator[%s] recieved incompatible type.", 
metrics[i].getName());
-            throw e;
-          } else {
-            log.debug(e, "Encountered parse error, skipping aggregator[%s].", 
metrics[i].getName());
-            parseExceptionsHolder.add(e.getMessage());
-          }
+      }
+      catch (ParseException e) {
+        // "aggregate" can throw ParseExceptions if a selector expects 
something but gets something else.
+        if (preserveExistingMetrics) {
+          log.warn(
+              e,
+              "Failing ingestion as preserveExistingMetrics is enabled but 
selector of aggregator[%s] received "
+              + "incompatible type.",
+              metrics[i].getName()
+          );
+          throw e;
+        } else {
+          log.debug(e, "Encountered parse error, skipping aggregator[%s].", 
metrics[i].getName());
+          parseExceptionsHolder.add(e.getMessage());
         }
       }
     }
 
-    rowContainer.set(null);
     return totalIncrementalBytes;
   }
 
@@ -409,11 +421,6 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
     aggregators.put(offset, value);
   }
 
-  protected void concurrentRemove(int offset)
-  {
-    aggregators.remove(offset);
-  }
-
   @Override
   public boolean canAppendRow()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index e27b84da422..4a104510f8f 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -36,7 +36,6 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.guice.NestedDataModule;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Accumulator;
 import org.apache.druid.java.util.common.guava.Sequence;
@@ -87,9 +86,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -463,11 +460,11 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     final IncrementalIndex index = indexCreator.createIndex(
         (Object) ingestAggregatorFactories.toArray(new AggregatorFactory[0])
     );
-    final int concurrentThreads = 2;
+    final int addThreads = 1;
     final int elementsPerThread = 10_000;
     final ListeningExecutorService indexExecutor = 
MoreExecutors.listeningDecorator(
         Executors.newFixedThreadPool(
-            concurrentThreads,
+            addThreads,
             new ThreadFactoryBuilder()
                 .setDaemon(false)
                 .setNameFormat("index-executor-%d")
@@ -477,7 +474,7 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     );
     final ListeningExecutorService queryExecutor = 
MoreExecutors.listeningDecorator(
         Executors.newFixedThreadPool(
-            concurrentThreads,
+            addThreads,
             new ThreadFactoryBuilder()
                 .setDaemon(false)
                 .setNameFormat("query-executor-%d")
@@ -486,8 +483,8 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     );
     final long timestamp = System.currentTimeMillis();
     final Interval queryInterval = 
Intervals.of("1900-01-01T00:00:00Z/2900-01-01T00:00:00Z");
-    final List<ListenableFuture<?>> indexFutures = 
Lists.newArrayListWithExpectedSize(concurrentThreads);
-    final List<ListenableFuture<?>> queryFutures = 
Lists.newArrayListWithExpectedSize(concurrentThreads);
+    final List<ListenableFuture<?>> indexFutures = 
Lists.newArrayListWithExpectedSize(addThreads);
+    final List<ListenableFuture<?>> queryFutures = 
Lists.newArrayListWithExpectedSize(addThreads);
     final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, 
null);
     final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
         new TimeseriesQueryQueryToolChest(),
@@ -498,9 +495,9 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     final AtomicInteger concurrentlyRan = new AtomicInteger(0);
     final AtomicInteger someoneRan = new AtomicInteger(0);
     final CountDownLatch startLatch = new CountDownLatch(1);
-    final CountDownLatch readyLatch = new CountDownLatch(concurrentThreads * 
2);
+    final CountDownLatch readyLatch = new CountDownLatch(addThreads * 2);
     final AtomicInteger queriesAccumualted = new AtomicInteger(0);
-    for (int j = 0; j < concurrentThreads; j++) {
+    for (int j = 0; j < addThreads; j++) {
       indexFutures.add(
           indexExecutor.submit(
               new Runnable()
@@ -577,7 +574,7 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
                         }
                     );
                     for (Double result : results) {
-                      final Integer maxValueExpected = someoneRan.get() + 
concurrentThreads;
+                      final int maxValueExpected = someoneRan.get() + 
addThreads;
                       if (maxValueExpected > 0) {
                         // Eventually consistent, but should be somewhere in 
that range
                         // Actual result is validated after all writes are 
guaranteed done.
@@ -617,70 +614,24 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     boolean isRollup = index.isRollup();
     for (Result<TimeseriesResultValue> result : results) {
       Assert.assertEquals(
-          elementsPerThread * (isRollup ? 1 : concurrentThreads),
+          elementsPerThread * (isRollup ? 1 : addThreads),
           result.getValue().getLongMetric("rows").intValue()
       );
       for (int i = 0; i < dimensionCount; ++i) {
         Assert.assertEquals(
             StringUtils.format("Failed long sum on dimension %d", i),
-            elementsPerThread * concurrentThreads,
+            elementsPerThread * addThreads,
             result.getValue().getLongMetric(StringUtils.format("sumResult%s", 
i)).intValue()
         );
         Assert.assertEquals(
             StringUtils.format("Failed double sum on dimension %d", i),
-            elementsPerThread * concurrentThreads,
+            elementsPerThread * addThreads,
             
result.getValue().getDoubleMetric(StringUtils.format("doubleSumResult%s", 
i)).intValue()
         );
       }
     }
   }
 
-  @Test
-  public void testConcurrentAdd() throws Exception
-  {
-    final IncrementalIndex index = indexCreator.createIndex((Object) 
DEFAULT_AGGREGATOR_FACTORIES);
-    final int threadCount = 10;
-    final int elementsPerThread = 200;
-    final int dimensionCount = 5;
-    ExecutorService executor = Execs.multiThreaded(threadCount, 
"IncrementalIndexTest-%d");
-    final long timestamp = System.currentTimeMillis();
-    final CountDownLatch latch = new CountDownLatch(threadCount);
-    for (int j = 0; j < threadCount; j++) {
-      executor.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              try {
-                for (int i = 0; i < elementsPerThread; i++) {
-                  index.add(getRow(timestamp + i, i, dimensionCount));
-                }
-              }
-              catch (Exception e) {
-                e.printStackTrace();
-              }
-              latch.countDown();
-            }
-          }
-      );
-    }
-    Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
-
-    boolean isRollup = index.isRollup();
-    Assert.assertEquals(dimensionCount, index.getDimensionNames().size());
-    Assert.assertEquals(elementsPerThread * (isRollup ? 1 : threadCount), 
index.size());
-    Iterator<Row> iterator = index.iterator();
-    int curr = 0;
-    while (iterator.hasNext()) {
-      Row row = iterator.next();
-      Assert.assertEquals(timestamp + (isRollup ? curr : curr / threadCount), 
row.getTimestampFromEpoch());
-      Assert.assertEquals(isRollup ? threadCount : 1, 
row.getMetric("count").intValue());
-      curr++;
-    }
-    Assert.assertEquals(elementsPerThread * (isRollup ? 1 : threadCount), 
curr);
-  }
-
   @Test
   public void testgetDimensions()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
index cde4c5c1fa3..4ef0a0c69d9 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
@@ -20,22 +20,16 @@
 package org.apache.druid.segment.incremental;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.guice.NestedDataModule;
 import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.js.JavaScriptConfig;
 import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMaxAggregator;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.segment.CloserRule;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.easymock.EasyMock;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,9 +37,6 @@ import org.junit.runners.Parameterized;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(Parameterized.class)
 public class IncrementalIndexIngestionTest extends InitializedNullHandlingTest
@@ -73,149 +64,6 @@ public class IncrementalIndexIngestionTest extends 
InitializedNullHandlingTest
     return IncrementalIndexCreator.getAppendableIndexTypes();
   }
 
-  @Test
-  public void testMultithreadAddFacts() throws Exception
-  {
-    final IncrementalIndex index = indexCreator.createIndex(new 
IncrementalIndexSchema.Builder()
-        .withQueryGranularity(Granularities.MINUTE)
-        .withMetrics(new LongMaxAggregatorFactory("max", "max"))
-        .build()
-    );
-
-    final int addThreadCount = 2;
-    Thread[] addThreads = new Thread[addThreadCount];
-    for (int i = 0; i < addThreadCount; ++i) {
-      addThreads[i] = new Thread(new Runnable()
-      {
-        @Override
-        public void run()
-        {
-          final Random random = ThreadLocalRandom.current();
-          try {
-            for (int j = 0; j < MAX_ROWS / addThreadCount; ++j) {
-              index.add(new MapBasedInputRow(
-                  0,
-                  Collections.singletonList("billy"),
-                  ImmutableMap.of("billy", random.nextLong(), "max", 1)
-              ));
-            }
-          }
-          catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
-      addThreads[i].start();
-    }
-
-    final AtomicInteger checkFailedCount = new AtomicInteger(0);
-    Thread checkThread = new Thread(new Runnable()
-    {
-      @Override
-      public void run()
-      {
-        while (!Thread.interrupted()) {
-          for (IncrementalIndexRow row : index.getFacts().keySet()) {
-            if (index.getMetricLongValue(row.getRowIndex(), 0) != 1) {
-              checkFailedCount.addAndGet(1);
-            }
-          }
-        }
-      }
-    });
-    checkThread.start();
-
-    for (int i = 0; i < addThreadCount; ++i) {
-      addThreads[i].join();
-    }
-    checkThread.interrupt();
-
-    Assert.assertEquals(0, checkFailedCount.get());
-  }
-
-  @Test
-  public void testMultithreadAddFactsUsingExpressionAndJavaScript() throws 
Exception
-  {
-    final IncrementalIndex indexExpr = indexCreator.createIndex(
-        new IncrementalIndexSchema.Builder()
-            .withQueryGranularity(Granularities.MINUTE)
-            .withMetrics(new LongSumAggregatorFactory(
-                "oddnum",
-                null,
-                "if(value%2==1,1,0)",
-                TestExprMacroTable.INSTANCE
-            ))
-            .withRollup(true)
-            .build()
-    );
-
-    final IncrementalIndex indexJs = indexCreator.createIndex(
-        new IncrementalIndexSchema.Builder()
-            .withQueryGranularity(Granularities.MINUTE)
-            .withMetrics(new JavaScriptAggregatorFactory(
-                "oddnum",
-                ImmutableList.of("value"),
-                "function(current, value) { if (value%2==1) current = current 
+ 1; return current;}",
-                "function() {return 0;}",
-                "function(a, b) { return a + b;}",
-                JavaScriptConfig.getEnabledInstance()
-            ))
-            .withRollup(true)
-            .build()
-    );
-
-    final int addThreadCount = 2;
-    Thread[] addThreads = new Thread[addThreadCount];
-    for (int i = 0; i < addThreadCount; ++i) {
-      addThreads[i] = new Thread(new Runnable()
-      {
-        @Override
-        public void run()
-        {
-          final Random random = ThreadLocalRandom.current();
-          try {
-            for (int j = 0; j < MAX_ROWS / addThreadCount; ++j) {
-              int randomInt = random.nextInt(100000);
-              MapBasedInputRow mapBasedInputRowExpr = new MapBasedInputRow(
-                  0,
-                  Collections.singletonList("billy"),
-                  ImmutableMap.of("billy", randomInt % 3, "value", randomInt)
-              );
-              MapBasedInputRow mapBasedInputRowJs = new MapBasedInputRow(
-                  0,
-                  Collections.singletonList("billy"),
-                  ImmutableMap.of("billy", randomInt % 3, "value", randomInt)
-              );
-              indexExpr.add(mapBasedInputRowExpr);
-              indexJs.add(mapBasedInputRowJs);
-            }
-          }
-          catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
-      addThreads[i].start();
-    }
-
-    for (int i = 0; i < addThreadCount; ++i) {
-      addThreads[i].join();
-    }
-
-    long exprSum = 0;
-    long jsSum = 0;
-
-    for (IncrementalIndexRow row : indexExpr.getFacts().keySet()) {
-      exprSum += indexExpr.getMetricLongValue(row.getRowIndex(), 0);
-    }
-
-    for (IncrementalIndexRow row : indexJs.getFacts().keySet()) {
-      jsSum += indexJs.getMetricLongValue(row.getRowIndex(), 0);
-    }
-
-    Assert.assertEquals(exprSum, jsSum);
-  }
-
   @Test
   public void testOnHeapIncrementalIndexClose() throws Exception
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
deleted file mode 100644
index 3d0674d2845..00000000000
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.incremental;
-
-import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
-import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
-import com.carrotsearch.junitbenchmarks.Clock;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.query.Druids;
-import org.apache.druid.query.FinalizeResultsQueryRunner;
-import org.apache.druid.query.QueryPlus;
-import org.apache.druid.query.QueryRunner;
-import org.apache.druid.query.QueryRunnerFactory;
-import org.apache.druid.query.QueryRunnerTestHelper;
-import org.apache.druid.query.Result;
-import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.query.timeseries.TimeseriesQuery;
-import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
-import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
-import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
-import org.apache.druid.query.timeseries.TimeseriesResultValue;
-import org.apache.druid.segment.IncrementalIndexSegment;
-import org.apache.druid.segment.Segment;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Extending AbstractBenchmark means only runs if explicitly called
- */
-@RunWith(Parameterized.class)
-public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
-{
-  private static AggregatorFactory[] factories;
-  static final int DIMENSION_COUNT = 5;
-
-  static {
-
-    final ArrayList<AggregatorFactory> ingestAggregatorFactories = new 
ArrayList<>(DIMENSION_COUNT + 1);
-    ingestAggregatorFactories.add(new CountAggregatorFactory("rows"));
-    for (int i = 0; i < DIMENSION_COUNT; ++i) {
-      ingestAggregatorFactories.add(
-          new LongSumAggregatorFactory(
-              StringUtils.format("sumResult%s", i),
-              StringUtils.format("Dim_%s", i)
-          )
-      );
-      ingestAggregatorFactories.add(
-          new DoubleSumAggregatorFactory(
-              StringUtils.format("doubleSumResult%s", i),
-              StringUtils.format("Dim_%s", i)
-          )
-      );
-    }
-    factories = ingestAggregatorFactories.toArray(new AggregatorFactory[0]);
-  }
-
-  private static final class MapIncrementalIndex extends OnheapIncrementalIndex
-  {
-    private final AtomicInteger indexIncrement = new AtomicInteger(0);
-    ConcurrentHashMap<Integer, Aggregator[]> indexedMap = new 
ConcurrentHashMap<Integer, Aggregator[]>();
-
-    public MapIncrementalIndex(
-        IncrementalIndexSchema incrementalIndexSchema,
-        int maxRowCount,
-        long maxBytesInMemory
-    )
-    {
-      super(
-          incrementalIndexSchema,
-          maxRowCount,
-          maxBytesInMemory,
-          false,
-          true
-      );
-    }
-
-    public MapIncrementalIndex(
-        long minTimestamp,
-        Granularity gran,
-        AggregatorFactory[] metrics,
-        int maxRowCount,
-        long maxBytesInMemory
-    )
-    {
-      super(
-          new IncrementalIndexSchema.Builder()
-              .withMinTimestamp(minTimestamp)
-              .withQueryGranularity(gran)
-              .withMetrics(metrics)
-              .build(),
-          maxRowCount,
-          maxBytesInMemory,
-          false,
-          true
-      );
-    }
-
-    @Override
-    protected Aggregator[] concurrentGet(int offset)
-    {
-      // All get operations should be fine
-      return indexedMap.get(offset);
-    }
-
-    @Override
-    protected void concurrentSet(int offset, Aggregator[] value)
-    {
-      indexedMap.put(offset, value);
-    }
-
-    @Override
-    protected AddToFactsResult addToFacts(
-        InputRow row,
-        IncrementalIndexRow key,
-        ThreadLocal<InputRow> rowContainer,
-        Supplier<InputRow> rowSupplier,
-        boolean skipMaxRowsInMemoryCheck // ignore for benchmark
-    ) throws IndexSizeExceededException
-    {
-
-      final Integer priorIdex = getFacts().getPriorIndex(key);
-
-      Aggregator[] aggs;
-      final AggregatorFactory[] metrics = getMetrics();
-      final AtomicInteger numEntries = getNumEntries();
-      final AtomicLong sizeInBytes = getBytesInMemory();
-      if (null != priorIdex) {
-        aggs = indexedMap.get(priorIdex);
-      } else {
-        aggs = new Aggregator[metrics.length];
-
-        for (int i = 0; i < metrics.length; i++) {
-          final AggregatorFactory agg = metrics[i];
-          aggs[i] = agg.factorize(
-              makeColumnSelectorFactory(agg, rowSupplier)
-          );
-        }
-        Integer rowIndex;
-
-        do {
-          rowIndex = indexIncrement.incrementAndGet();
-        } while (null != indexedMap.putIfAbsent(rowIndex, aggs));
-
-
-        // Last ditch sanity checks
-        if ((numEntries.get() >= maxRowCount || sizeInBytes.get() >= 
maxBytesInMemory)
-            && getFacts().getPriorIndex(key) == 
IncrementalIndexRow.EMPTY_ROW_INDEX) {
-          throw new IndexSizeExceededException("Maximum number of rows or max 
bytes reached");
-        }
-        final int prev = getFacts().putIfAbsent(key, rowIndex);
-        if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
-          numEntries.incrementAndGet();
-          sizeInBytes.incrementAndGet();
-        } else {
-          // We lost a race
-          aggs = indexedMap.get(prev);
-          // Free up the misfire
-          indexedMap.remove(rowIndex);
-          // This is expected to occur ~80% of the time in the worst scenarios
-        }
-      }
-
-      rowContainer.set(row);
-
-      for (Aggregator agg : aggs) {
-        synchronized (agg) {
-          agg.aggregate();
-        }
-      }
-
-      rowContainer.set(null);
-
-      return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), new 
ArrayList<>());
-    }
-
-    @Override
-    public int getLastRowIndex()
-    {
-      return indexIncrement.get() - 1;
-    }
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> getParameters()
-  {
-    return ImmutableList.of(
-        new Object[]{OnheapIncrementalIndex.class},
-        new Object[]{MapIncrementalIndex.class}
-    );
-  }
-
-  private final Class<? extends OnheapIncrementalIndex> incrementalIndex;
-
-  public OnheapIncrementalIndexBenchmark(Class<? extends 
OnheapIncrementalIndex> incrementalIndex)
-  {
-    this.incrementalIndex = incrementalIndex;
-  }
-
-
-  private static MapBasedInputRow getLongRow(long timestamp, int rowID, int 
dimensionCount)
-  {
-    List<String> dimensionList = new ArrayList<String>(dimensionCount);
-    ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
-    for (int i = 0; i < dimensionCount; i++) {
-      String dimName = StringUtils.format("Dim_%d", i);
-      dimensionList.add(dimName);
-      builder.put(dimName, new Integer(rowID).longValue());
-    }
-    return new MapBasedInputRow(timestamp, dimensionList, builder.build());
-  }
-
-  @Ignore
-  @Test
-  @BenchmarkOptions(callgc = true, clock = Clock.REAL_TIME, warmupRounds = 10, 
benchmarkRounds = 20)
-  public void testConcurrentAddRead()
-      throws InterruptedException, ExecutionException, NoSuchMethodException, 
IllegalAccessException,
-             InvocationTargetException, InstantiationException
-  {
-
-    final int taskCount = 30;
-    final int concurrentThreads = 3;
-    final int elementsPerThread = 1 << 15;
-
-    final IncrementalIndex incrementalIndex = 
this.incrementalIndex.getConstructor(
-        IncrementalIndexSchema.class,
-        boolean.class,
-        boolean.class,
-        boolean.class,
-        boolean.class,
-        int.class
-    ).newInstance(
-        new IncrementalIndexSchema.Builder().withMetrics(factories).build(),
-        true,
-        true,
-        false,
-        true,
-        elementsPerThread * taskCount
-    );
-    final ArrayList<AggregatorFactory> queryAggregatorFactories = new 
ArrayList<>(DIMENSION_COUNT + 1);
-    queryAggregatorFactories.add(new CountAggregatorFactory("rows"));
-    for (int i = 0; i < DIMENSION_COUNT; ++i) {
-      queryAggregatorFactories.add(
-          new LongSumAggregatorFactory(
-              StringUtils.format("sumResult%s", i),
-              StringUtils.format("sumResult%s", i)
-          )
-      );
-      queryAggregatorFactories.add(
-          new DoubleSumAggregatorFactory(
-              StringUtils.format("doubleSumResult%s", i),
-              StringUtils.format("doubleSumResult%s", i)
-          )
-      );
-    }
-
-    final ListeningExecutorService indexExecutor = 
MoreExecutors.listeningDecorator(
-        Executors.newFixedThreadPool(
-            concurrentThreads,
-            new ThreadFactoryBuilder()
-                .setDaemon(false)
-                .setNameFormat("index-executor-%d")
-                .setPriority(Thread.MIN_PRIORITY)
-                .build()
-        )
-    );
-    final ListeningExecutorService queryExecutor = 
MoreExecutors.listeningDecorator(
-        Executors.newFixedThreadPool(
-            concurrentThreads,
-            new ThreadFactoryBuilder()
-                .setDaemon(false)
-                .setNameFormat("query-executor-%d")
-                .build()
-        )
-    );
-    final long timestamp = System.currentTimeMillis();
-    final Interval queryInterval = 
Intervals.of("1900-01-01T00:00:00Z/2900-01-01T00:00:00Z");
-    final List<ListenableFuture<?>> indexFutures = new ArrayList<>();
-    final List<ListenableFuture<?>> queryFutures = new ArrayList<>();
-    final Segment incrementalIndexSegment = new 
IncrementalIndexSegment(incrementalIndex, null);
-    final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
-        new TimeseriesQueryQueryToolChest(),
-        new TimeseriesQueryEngine(),
-        QueryRunnerTestHelper.NOOP_QUERYWATCHER
-    );
-    final AtomicInteger currentlyRunning = new AtomicInteger(0);
-    final AtomicBoolean concurrentlyRan = new AtomicBoolean(false);
-    final AtomicBoolean someoneRan = new AtomicBoolean(false);
-    for (int j = 0; j < taskCount; j++) {
-      indexFutures.add(
-          indexExecutor.submit(
-              new Runnable()
-              {
-                @Override
-                public void run()
-                {
-                  currentlyRunning.incrementAndGet();
-                  try {
-                    for (int i = 0; i < elementsPerThread; i++) {
-                      incrementalIndex.add(getLongRow(timestamp + i, 1, 
DIMENSION_COUNT));
-                    }
-                  }
-                  catch (IndexSizeExceededException e) {
-                    throw new RuntimeException(e);
-                  }
-                  currentlyRunning.decrementAndGet();
-                  someoneRan.set(true);
-                }
-              }
-          )
-      );
-
-      queryFutures.add(
-          queryExecutor.submit(
-              new Runnable()
-              {
-                @Override
-                public void run()
-                {
-                  QueryRunner<Result<TimeseriesResultValue>> runner = new 
FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
-                      factory.createRunner(incrementalIndexSegment),
-                      factory.getToolchest()
-                  );
-                  TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
-                                                .dataSource("xxx")
-                                                .granularity(Granularities.ALL)
-                                                
.intervals(ImmutableList.of(queryInterval))
-                                                
.aggregators(queryAggregatorFactories)
-                                                .build();
-                  List<Result<TimeseriesResultValue>> results = 
runner.run(QueryPlus.wrap(query)).toList();
-                  for (Result<TimeseriesResultValue> result : results) {
-                    if (someoneRan.get()) {
-                      
Assert.assertTrue(result.getValue().getDoubleMetric("doubleSumResult0") > 0);
-                    }
-                  }
-                  if (currentlyRunning.get() > 0) {
-                    concurrentlyRan.set(true);
-                  }
-                }
-              }
-          )
-      );
-
-    }
-    List<ListenableFuture<?>> allFutures = new ArrayList<>(queryFutures.size() 
+ indexFutures.size());
-    allFutures.addAll(queryFutures);
-    allFutures.addAll(indexFutures);
-    Futures.allAsList(allFutures).get();
-    //Assert.assertTrue("Did not hit concurrency, please try again", 
concurrentlyRan.get());
-    queryExecutor.shutdown();
-    indexExecutor.shutdown();
-    QueryRunner<Result<TimeseriesResultValue>> runner = new 
FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
-        factory.createRunner(incrementalIndexSegment),
-        factory.getToolchest()
-    );
-    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
-                                  .dataSource("xxx")
-                                  .granularity(Granularities.ALL)
-                                  .intervals(ImmutableList.of(queryInterval))
-                                  .aggregators(queryAggregatorFactories)
-                                  .build();
-    List<Result<TimeseriesResultValue>> results = 
runner.run(QueryPlus.wrap(query)).toList();
-    final int expectedVal = elementsPerThread * taskCount;
-    for (Result<TimeseriesResultValue> result : results) {
-      Assert.assertEquals(elementsPerThread, 
result.getValue().getLongMetric("rows").intValue());
-      for (int i = 0; i < DIMENSION_COUNT; ++i) {
-        Assert.assertEquals(
-            StringUtils.format("Failed long sum on dimension %d", i),
-            expectedVal,
-            result.getValue().getLongMetric(StringUtils.format("sumResult%s", 
i)).intValue()
-        );
-        Assert.assertEquals(
-            StringUtils.format("Failed double sum on dimension %d", i),
-            expectedVal,
-            
result.getValue().getDoubleMetric(StringUtils.format("doubleSumResult%s", 
i)).intValue()
-        );
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to