This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 73a644258d6 abstract `IncrementalIndex` cursor stuff to prepare for 
using different "views" of the data based on the cursor build spec (#17064)
73a644258d6 is described below

commit 73a644258d69c05baac127c17de90b23e0f79f5e
Author: Clint Wylie <[email protected]>
AuthorDate: Sun Sep 15 16:45:51 2024 -0700

    abstract `IncrementalIndex` cursor stuff to prepare for using different 
"views" of the data based on the cursor build spec (#17064)
    
    * abstract `IncrementalIndex` cursor stuff to prepare to allow for 
possibility of using different "views" of the data based on the cursor build 
spec
    changes:
    * introduce `IncrementalIndexRowSelector` interface to capture how 
`IncrementalIndexCursor` and `IncrementalIndexColumnSelectorFactory` read data
    * `IncrementalIndex` implements `IncrementalIndexRowSelector`
    * move `FactsHolder` interface to separate file
    * other minor refactorings
---
 .../druid/segment/incremental/FactsHolder.java     |  82 ++++
 .../segment/incremental/IncrementalIndex.java      | 453 ++++++++++-----------
 .../IncrementalIndexColumnSelectorFactory.java     |  22 +-
 .../incremental/IncrementalIndexCursorFactory.java |   6 +-
 .../incremental/IncrementalIndexCursorHolder.java  |  68 ++--
 .../segment/incremental/IncrementalIndexRow.java   |   2 +
 .../incremental/IncrementalIndexRowSelector.java   | 104 +++++
 .../incremental/OnheapIncrementalIndex.java        |  86 ++--
 .../processor/test/TestFrameProcessorUtils.java    |  17 +-
 .../druid/segment/AutoTypeColumnIndexerTest.java   |  44 +-
 .../segment/NestedDataColumnIndexerV4Test.java     |  21 +-
 .../incremental/IncrementalIndexIngestionTest.java |  32 +-
 .../IncrementalIndexMultiValueSpecTest.java        |  17 +-
 .../segment/virtual/ExpressionSelectorsTest.java   |  15 +-
 14 files changed, 572 insertions(+), 397 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java
new file mode 100644
index 00000000000..f7eede10150
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Comparator;
+import java.util.Iterator;
+
+/**
+ * {@link IncrementalIndexRow} storage interface, a mutable data structure for 
building up a set or rows to eventually
+ * persist into an immutable segment
+ *
+ * @see IncrementalIndex for the data processor which constructs {@link 
IncrementalIndexRow} to store here
+ */
+public interface FactsHolder
+{
+  /**
+   * @return the previous rowIndex associated with the specified key, or
+   * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for 
the key.
+   */
+  int getPriorIndex(IncrementalIndexRow key);
+
+  /**
+   * Get minimum {@link IncrementalIndexRow#getTimestamp()} present in the 
facts holder
+   */
+  long getMinTimeMillis();
+
+  /**
+   * Get maximum {@link IncrementalIndexRow#getTimestamp()} present in the 
facts holder
+   */
+  long getMaxTimeMillis();
+
+  /**
+   * Get all {@link IncrementalIndex}, depending on the implementation, these 
rows may or may not be ordered in the same
+   * order they will be persisted in. Use {@link #persistIterable()} if this 
is required.
+   */
+  Iterator<IncrementalIndexRow> iterator(boolean descending);
+
+  /**
+   * Get all {@link IncrementalIndexRow} with {@link 
IncrementalIndexRow#getTimestamp()} between the start and end
+   * timestamps specified
+   */
+  Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long 
timeStart, long timeEnd);
+
+  /**
+   * Get all row {@link IncrementalIndexRow} 'keys', which is distinct groups 
if this is an aggregating facts holder or
+   * just every row present if not
+   */
+  Iterable<IncrementalIndexRow> keySet();
+
+  /**
+   * Get all {@link IncrementalIndexRow} to persist, ordered with {@link 
Comparator <IncrementalIndexRow>}
+   */
+  Iterable<IncrementalIndexRow> persistIterable();
+
+  /**
+   * @return the previous rowIndex associated with the specified key, or
+   * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for 
the key.
+   */
+  int putIfAbsent(IncrementalIndexRow key, int rowIndex);
+
+  /**
+   * Clear all rows present in the facts holder
+   */
+  void clear();
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index fc2a02c47b7..8adc47f6533 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -105,7 +105,7 @@ import java.util.stream.Collectors;
  * {@link IncrementalIndexCursorFactory} are thread-safe, and may be called 
concurrently with each other, and with
  * the "add" methods. This concurrency model supports real-time queries of the 
data in the index.
  */
-public abstract class IncrementalIndex implements Iterable<Row>, Closeable, 
ColumnInspector
+public abstract class IncrementalIndex implements IncrementalIndexRowSelector, 
ColumnInspector, Iterable<Row>, Closeable
 {
   /**
    * Column selector used at ingestion time for inputs to aggregators.
@@ -255,8 +255,9 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
 
   private final boolean useSchemaDiscovery;
 
-  private final InputRowHolder inputRowHolder = new InputRowHolder();
+  protected final InputRowHolder inputRowHolder = new InputRowHolder();
 
+  @Nullable
   private volatile DateTime maxIngestedEventTime;
 
   /**
@@ -366,8 +367,6 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     );
   }
 
-  public abstract FactsHolder getFacts();
-
   public abstract boolean canAppendRow();
 
   public abstract String getOutOfRowsReason();
@@ -384,100 +383,11 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
       boolean skipMaxRowsInMemoryCheck
   ) throws IndexSizeExceededException;
 
-  public abstract int getLastRowIndex();
-
-  protected abstract float getMetricFloatValue(int rowOffset, int aggOffset);
-
-  protected abstract long getMetricLongValue(int rowOffset, int aggOffset);
-
-  protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset);
-
-  protected abstract double getMetricDoubleValue(int rowOffset, int aggOffset);
-
-  protected abstract boolean isNull(int rowOffset, int aggOffset);
-
-  static class IncrementalIndexRowResult
-  {
-    private final IncrementalIndexRow incrementalIndexRow;
-    private final List<String> parseExceptionMessages;
-
-    IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, 
List<String> parseExceptionMessages)
-    {
-      this.incrementalIndexRow = incrementalIndexRow;
-      this.parseExceptionMessages = parseExceptionMessages;
-    }
-
-    IncrementalIndexRow getIncrementalIndexRow()
-    {
-      return incrementalIndexRow;
-    }
-
-    List<String> getParseExceptionMessages()
-    {
-      return parseExceptionMessages;
-    }
-  }
-
-  static class AddToFactsResult
-  {
-    private final int rowCount;
-    private final long bytesInMemory;
-    private final List<String> parseExceptionMessages;
-
-    public AddToFactsResult(
-        int rowCount,
-        long bytesInMemory,
-        List<String> parseExceptionMessages
-    )
-    {
-      this.rowCount = rowCount;
-      this.bytesInMemory = bytesInMemory;
-      this.parseExceptionMessages = parseExceptionMessages;
-    }
-
-    int getRowCount()
-    {
-      return rowCount;
-    }
-
-    public long getBytesInMemory()
-    {
-      return bytesInMemory;
-    }
-
-    public List<String> getParseExceptionMessages()
-    {
-      return parseExceptionMessages;
-    }
-  }
-
-  public static class InputRowHolder
-  {
-    @Nullable
-    private InputRow row;
-    private long rowId = -1;
-
-    public void set(final InputRow row)
-    {
-      this.row = row;
-      this.rowId++;
-    }
 
-    public void unset()
-    {
-      this.row = null;
-    }
-
-    public InputRow getRow()
-    {
-      return Preconditions.checkNotNull(row, "row");
-    }
-
-    public long getRowId()
-    {
-      return rowId;
-    }
-  }
+  public abstract Iterable<Row> iterableWithPostAggregations(
+      @Nullable List<PostAggregator> postAggs,
+      boolean descending
+  );
 
   public boolean isRollup()
   {
@@ -746,23 +656,6 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     );
   }
 
-  private static String getSimplifiedEventStringFromRow(InputRow inputRow)
-  {
-    if (inputRow instanceof MapBasedInputRow) {
-      return ((MapBasedInputRow) inputRow).getEvent().toString();
-    }
-
-    if (inputRow instanceof ListBasedInputRow) {
-      return ((ListBasedInputRow) inputRow).asMap().toString();
-    }
-
-    if (inputRow instanceof TransformedInputRow) {
-      InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow();
-      return getSimplifiedEventStringFromRow(innerRow);
-    }
-
-    return inputRow.toString();
-  }
 
   private synchronized void updateMaxIngestedTime(DateTime eventTime)
   {
@@ -771,6 +664,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     }
   }
 
+  @Override
   public boolean isEmpty()
   {
     return numEntries.get() == 0;
@@ -861,6 +755,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
   /**
    * Returns the descriptor for a particular dimension.
    */
+  @Override
   @Nullable
   public DimensionDesc getDimension(String dimension)
   {
@@ -869,22 +764,39 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     }
   }
 
-  public ColumnValueSelector<?> makeMetricColumnValueSelector(String metric, 
IncrementalIndexRowHolder currEntry)
+  @Override
+  @Nullable
+  public MetricDesc getMetric(String metric)
   {
-    MetricDesc metricDesc = metricDescs.get(metric);
+    return metricDescs.get(metric);
+  }
+
+  @Override
+  public List<OrderBy> getOrdering()
+  {
+    return metadata.getOrdering();
+  }
+
+  public static ColumnValueSelector<?> makeMetricColumnValueSelector(
+      IncrementalIndexRowSelector rowSelector,
+      IncrementalIndexRowHolder currEntry,
+      String metric
+  )
+  {
+    final MetricDesc metricDesc = rowSelector.getMetric(metric);
     if (metricDesc == null) {
       return NilColumnValueSelector.instance();
     }
     int metricIndex = metricDesc.getIndex();
     switch (metricDesc.getCapabilities().getType()) {
       case COMPLEX:
-        return new ObjectMetricColumnSelector(metricDesc, currEntry, 
metricIndex);
+        return new ObjectMetricColumnSelector(rowSelector, currEntry, 
metricDesc);
       case LONG:
-        return new LongMetricColumnSelector(currEntry, metricIndex);
+        return new LongMetricColumnSelector(rowSelector, currEntry, 
metricIndex);
       case FLOAT:
-        return new FloatMetricColumnSelector(currEntry, metricIndex);
+        return new FloatMetricColumnSelector(rowSelector, currEntry, 
metricIndex);
       case DOUBLE:
-        return new DoubleMetricColumnSelector(currEntry, metricIndex);
+        return new DoubleMetricColumnSelector(rowSelector, currEntry, 
metricIndex);
       case STRING:
         throw new IllegalStateException("String is not a metric column type");
       default:
@@ -910,13 +822,6 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     return isEmpty() ? null : DateTimes.utc(getMaxTimeMillis());
   }
 
-  @Nullable
-  public Integer getDimensionIndex(String dimension)
-  {
-    DimensionDesc dimSpec = getDimension(dimension);
-    return dimSpec == null ? null : dimSpec.getIndex();
-  }
-
   /**
    * Returns names of time and dimension columns, in persist sort order. 
Includes {@link ColumnHolder#TIME_COLUMN_NAME}.
    */
@@ -1003,6 +908,49 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     return metadata;
   }
 
+  @Override
+  public Iterator<Row> iterator()
+  {
+    return iterableWithPostAggregations(null, false).iterator();
+  }
+
+  public DateTime getMaxIngestedEventTime()
+  {
+    return maxIngestedEventTime;
+  }
+
+  protected ColumnSelectorFactory makeColumnSelectorFactory(
+      @Nullable final AggregatorFactory agg,
+      final InputRowHolder in
+  )
+  {
+    return makeColumnSelectorFactory(virtualColumns, in, agg);
+  }
+
+  protected final Comparator<IncrementalIndexRow> dimsComparator()
+  {
+    return new IncrementalIndexRowComparator(timePosition, dimensionDescsList);
+  }
+
+
+  private static String getSimplifiedEventStringFromRow(InputRow inputRow)
+  {
+    if (inputRow instanceof MapBasedInputRow) {
+      return ((MapBasedInputRow) inputRow).getEvent().toString();
+    }
+
+    if (inputRow instanceof ListBasedInputRow) {
+      return ((ListBasedInputRow) inputRow).asMap().toString();
+    }
+
+    if (inputRow instanceof TransformedInputRow) {
+      InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow();
+      return getSimplifiedEventStringFromRow(innerRow);
+    }
+
+    return inputRow.toString();
+  }
+
   private static AggregatorFactory[] 
getCombiningAggregators(AggregatorFactory[] aggregators)
   {
     AggregatorFactory[] combiningAggregators = new 
AggregatorFactory[aggregators.length];
@@ -1012,30 +960,24 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     return combiningAggregators;
   }
 
-  @Override
-  public Iterator<Row> iterator()
-  {
-    return iterableWithPostAggregations(null, false).iterator();
-  }
-
-  public abstract Iterable<Row> iterableWithPostAggregations(
-      @Nullable List<PostAggregator> postAggs,
-      boolean descending
-  );
-
-  public DateTime getMaxIngestedEventTime()
+  private static boolean allNull(Object[] dims, int startPosition)
   {
-    return maxIngestedEventTime;
+    for (int i = startPosition; i < dims.length; i++) {
+      if (dims[i] != null) {
+        return false;
+      }
+    }
+    return true;
   }
 
   public static final class DimensionDesc
   {
     private final int index;
     private final String name;
-    private final DimensionHandler handler;
-    private final DimensionIndexer indexer;
+    private final DimensionHandler<?, ?, ?> handler;
+    private final DimensionIndexer<?, ?, ?> indexer;
 
-    public DimensionDesc(int index, String name, DimensionHandler handler, 
boolean useMaxMemoryEstimates)
+    public DimensionDesc(int index, String name, DimensionHandler<?, ?, ?> 
handler, boolean useMaxMemoryEstimates)
     {
       this.index = index;
       this.name = name;
@@ -1058,12 +1000,12 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
       return indexer.getColumnCapabilities();
     }
 
-    public DimensionHandler getHandler()
+    public DimensionHandler<?, ?, ?> getHandler()
     {
       return handler;
     }
 
-    public DimensionIndexer getIndexer()
+    public DimensionIndexer<?, ?, ?> getIndexer()
     {
       return indexer;
     }
@@ -1124,19 +1066,90 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     }
   }
 
-  protected ColumnSelectorFactory makeColumnSelectorFactory(
-      @Nullable final AggregatorFactory agg,
-      final InputRowHolder in
-  )
+  public static class AddToFactsResult
   {
-    return makeColumnSelectorFactory(virtualColumns, in, agg);
+    private final int rowCount;
+    private final long bytesInMemory;
+    private final List<String> parseExceptionMessages;
+
+    public AddToFactsResult(
+        int rowCount,
+        long bytesInMemory,
+        List<String> parseExceptionMessages
+    )
+    {
+      this.rowCount = rowCount;
+      this.bytesInMemory = bytesInMemory;
+      this.parseExceptionMessages = parseExceptionMessages;
+    }
+
+    int getRowCount()
+    {
+      return rowCount;
+    }
+
+    public long getBytesInMemory()
+    {
+      return bytesInMemory;
+    }
+
+    public List<String> getParseExceptionMessages()
+    {
+      return parseExceptionMessages;
+    }
   }
 
-  protected final Comparator<IncrementalIndexRow> dimsComparator()
+  public static class InputRowHolder
   {
-    return new IncrementalIndexRowComparator(timePosition, dimensionDescsList);
+    @Nullable
+    private InputRow row;
+    private long rowId = -1;
+
+    public void set(final InputRow row)
+    {
+      this.row = row;
+      this.rowId++;
+    }
+
+    public void unset()
+    {
+      this.row = null;
+    }
+
+    public InputRow getRow()
+    {
+      return Preconditions.checkNotNull(row, "row");
+    }
+
+    public long getRowId()
+    {
+      return rowId;
+    }
   }
 
+  static class IncrementalIndexRowResult
+  {
+    private final IncrementalIndexRow incrementalIndexRow;
+    private final List<String> parseExceptionMessages;
+
+    IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, 
List<String> parseExceptionMessages)
+    {
+      this.incrementalIndexRow = incrementalIndexRow;
+      this.parseExceptionMessages = parseExceptionMessages;
+    }
+
+    IncrementalIndexRow getIncrementalIndexRow()
+    {
+      return incrementalIndexRow;
+    }
+
+    List<String> getParseExceptionMessages()
+    {
+      return parseExceptionMessages;
+    }
+  }
+
+
   @VisibleForTesting
   static final class IncrementalIndexRowComparator implements 
Comparator<IncrementalIndexRow>
   {
@@ -1207,57 +1220,19 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     }
   }
 
-  private static boolean allNull(Object[] dims, int startPosition)
-  {
-    for (int i = startPosition; i < dims.length; i++) {
-      if (dims[i] != null) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  public interface FactsHolder
-  {
-    /**
-     * @return the previous rowIndex associated with the specified key, or
-     * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for 
the key.
-     */
-    int getPriorIndex(IncrementalIndexRow key);
-
-    long getMinTimeMillis();
-
-    long getMaxTimeMillis();
-
-    Iterator<IncrementalIndexRow> iterator(boolean descending);
-
-    Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long 
timeStart, long timeEnd);
-
-    Iterable<IncrementalIndexRow> keySet();
-
-    /**
-     * Get all {@link IncrementalIndexRow} to persist, ordered with {@link 
Comparator<IncrementalIndexRow>}
-     *
-     * @return
-     */
-    Iterable<IncrementalIndexRow> persistIterable();
-
-    /**
-     * @return the previous rowIndex associated with the specified key, or
-     * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for 
the key.
-     */
-    int putIfAbsent(IncrementalIndexRow key, int rowIndex);
-
-    void clear();
-  }
-
-  private final class LongMetricColumnSelector implements LongColumnSelector
+  private static final class LongMetricColumnSelector implements 
LongColumnSelector
   {
+    private final IncrementalIndexRowSelector rowSelector;
     private final IncrementalIndexRowHolder currEntry;
     private final int metricIndex;
 
-    public LongMetricColumnSelector(IncrementalIndexRowHolder currEntry, int 
metricIndex)
+    public LongMetricColumnSelector(
+        IncrementalIndexRowSelector rowSelector,
+        IncrementalIndexRowHolder currEntry,
+        int metricIndex
+    )
     {
+      this.rowSelector = rowSelector;
       this.currEntry = currEntry;
       this.metricIndex = metricIndex;
     }
@@ -1265,119 +1240,131 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
     @Override
     public long getLong()
     {
-      assert NullHandling.replaceWithDefault() || !isNull();
-      return getMetricLongValue(currEntry.get().getRowIndex(), metricIndex);
+      return rowSelector.getMetricLongValue(currEntry.get().getRowIndex(), 
metricIndex);
     }
 
     @Override
-    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    public boolean isNull()
     {
-      inspector.visit("index", IncrementalIndex.this);
+      return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex);
     }
 
     @Override
-    public boolean isNull()
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
     {
-      return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), 
metricIndex);
+      inspector.visit("index", rowSelector);
     }
   }
 
-  private final class ObjectMetricColumnSelector extends ObjectColumnSelector
+  private static final class FloatMetricColumnSelector implements 
FloatColumnSelector
   {
+    private final IncrementalIndexRowSelector rowSelector;
     private final IncrementalIndexRowHolder currEntry;
     private final int metricIndex;
-    private Class classOfObject;
 
-    public ObjectMetricColumnSelector(
-        MetricDesc metricDesc,
+    public FloatMetricColumnSelector(
+        IncrementalIndexRowSelector rowSelector,
         IncrementalIndexRowHolder currEntry,
         int metricIndex
     )
     {
       this.currEntry = currEntry;
+      this.rowSelector = rowSelector;
       this.metricIndex = metricIndex;
-      classOfObject = 
ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz();
     }
 
-    @Nullable
     @Override
-    public Object getObject()
+    public float getFloat()
     {
-      return getMetricObjectValue(currEntry.get().getRowIndex(), metricIndex);
+      return rowSelector.getMetricFloatValue(currEntry.get().getRowIndex(), 
metricIndex);
     }
 
     @Override
-    public Class classOfObject()
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
     {
-      return classOfObject;
+      inspector.visit("index", rowSelector);
     }
 
     @Override
-    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    public boolean isNull()
     {
-      inspector.visit("index", IncrementalIndex.this);
+      return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex);
     }
   }
 
-  private final class FloatMetricColumnSelector implements FloatColumnSelector
+  private static final class DoubleMetricColumnSelector implements 
DoubleColumnSelector
   {
+    private final IncrementalIndexRowSelector rowSelector;
     private final IncrementalIndexRowHolder currEntry;
     private final int metricIndex;
 
-    public FloatMetricColumnSelector(IncrementalIndexRowHolder currEntry, int 
metricIndex)
+    public DoubleMetricColumnSelector(
+        IncrementalIndexRowSelector rowSelector,
+        IncrementalIndexRowHolder currEntry,
+        int metricIndex
+    )
     {
       this.currEntry = currEntry;
+      this.rowSelector = rowSelector;
       this.metricIndex = metricIndex;
     }
 
     @Override
-    public float getFloat()
+    public double getDouble()
     {
       assert NullHandling.replaceWithDefault() || !isNull();
-      return getMetricFloatValue(currEntry.get().getRowIndex(), metricIndex);
+      return rowSelector.getMetricDoubleValue(currEntry.get().getRowIndex(), 
metricIndex);
     }
 
     @Override
-    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    public boolean isNull()
     {
-      inspector.visit("index", IncrementalIndex.this);
+      return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex);
     }
 
     @Override
-    public boolean isNull()
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
     {
-      return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), 
metricIndex);
+      inspector.visit("index", rowSelector);
     }
   }
 
-  private final class DoubleMetricColumnSelector implements 
DoubleColumnSelector
+  private static final class ObjectMetricColumnSelector extends 
ObjectColumnSelector
   {
+    private final IncrementalIndexRowSelector rowSelector;
     private final IncrementalIndexRowHolder currEntry;
     private final int metricIndex;
+    private final Class<?> classOfObject;
 
-    public DoubleMetricColumnSelector(IncrementalIndexRowHolder currEntry, int 
metricIndex)
+    public ObjectMetricColumnSelector(
+        IncrementalIndexRowSelector rowSelector,
+        IncrementalIndexRowHolder currEntry,
+        MetricDesc metricDesc
+    )
     {
       this.currEntry = currEntry;
-      this.metricIndex = metricIndex;
+      this.rowSelector = rowSelector;
+      this.metricIndex = metricDesc.getIndex();
+      this.classOfObject = 
ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz();
     }
 
+    @Nullable
     @Override
-    public double getDouble()
+    public Object getObject()
     {
-      assert NullHandling.replaceWithDefault() || !isNull();
-      return getMetricDoubleValue(currEntry.get().getRowIndex(), metricIndex);
+      return rowSelector.getMetricObjectValue(currEntry.get().getRowIndex(), 
metricIndex);
     }
 
     @Override
-    public boolean isNull()
+    public Class<?> classOfObject()
     {
-      return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), 
metricIndex);
+      return classOfObject;
     }
 
     @Override
     public void inspectRuntimeShape(RuntimeShapeInspector inspector)
     {
-      inspector.visit("index", IncrementalIndex.this);
+      inspector.visit("index", rowSelector);
     }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java
index 86e8c6690c2..9d60edef044 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java
@@ -43,29 +43,29 @@ import javax.annotation.Nullable;
 class IncrementalIndexColumnSelectorFactory implements ColumnSelectorFactory, 
RowIdSupplier
 {
   private final ColumnInspector snapshotColumnInspector;
-  private final IncrementalIndex index;
   private final VirtualColumns virtualColumns;
   private final Order timeOrder;
   private final IncrementalIndexRowHolder rowHolder;
+  private final IncrementalIndexRowSelector rowSelector;
 
   IncrementalIndexColumnSelectorFactory(
-      IncrementalIndex index,
+      IncrementalIndexRowSelector rowSelector,
       VirtualColumns virtualColumns,
       Order timeOrder,
       IncrementalIndexRowHolder rowHolder
   )
   {
-    this.index = index;
     this.virtualColumns = virtualColumns;
     this.timeOrder = timeOrder;
     this.rowHolder = rowHolder;
+    this.rowSelector = rowSelector;
     this.snapshotColumnInspector = new ColumnInspector()
     {
       @Nullable
       @Override
       public ColumnCapabilities getColumnCapabilities(String column)
       {
-        return IncrementalIndexCursorFactory.snapshotColumnCapabilities(index, 
column);
+        return 
IncrementalIndexCursorFactory.snapshotColumnCapabilities(rowSelector, column);
       }
     };
   }
@@ -87,13 +87,13 @@ class IncrementalIndexColumnSelectorFactory implements 
ColumnSelectorFactory, Ro
 
     if (dimension.equals(ColumnHolder.TIME_COLUMN_NAME) && timeOrder != 
Order.NONE) {
       return new SingleScanTimeDimensionSelector(
-          makeColumnValueSelector(dimension),
+          makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
           extractionFn,
           timeOrder
       );
     }
 
-    final IncrementalIndex.DimensionDesc dimensionDesc = 
index.getDimension(dimensionSpec.getDimension());
+    final IncrementalIndex.DimensionDesc dimensionDesc = 
rowSelector.getDimension(dimensionSpec.getDimension());
     if (dimensionDesc == null) {
       // not a dimension, column may be a metric
       ColumnCapabilities capabilities = getColumnCapabilities(dimension);
@@ -122,19 +122,17 @@ class IncrementalIndexColumnSelectorFactory implements 
ColumnSelectorFactory, Ro
     if (virtualColumns.exists(columnName)) {
       return virtualColumns.makeColumnValueSelector(columnName, this);
     }
-
-    if (columnName.equals(ColumnHolder.TIME_COLUMN_NAME)) {
+    if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
       return rowHolder;
     }
 
-    final Integer dimIndex = index.getDimensionIndex(columnName);
-    if (dimIndex != null) {
-      final IncrementalIndex.DimensionDesc dimensionDesc = 
index.getDimension(columnName);
+    final IncrementalIndex.DimensionDesc dimensionDesc = 
rowSelector.getDimension(columnName);
+    if (dimensionDesc != null) {
       final DimensionIndexer indexer = dimensionDesc.getIndexer();
       return indexer.makeColumnValueSelector(rowHolder, dimensionDesc);
     }
 
-    return index.makeMetricColumnValueSelector(columnName, rowHolder);
+    return IncrementalIndex.makeMetricColumnValueSelector(rowSelector, 
rowHolder, columnName);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
index e034f820dfb..b73a7b682a3 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
@@ -99,9 +99,9 @@ public class IncrementalIndexCursorFactory implements 
CursorFactory
     return snapshotColumnCapabilities(index, column);
   }
 
-  static ColumnCapabilities snapshotColumnCapabilities(IncrementalIndex index, 
String column)
+  static ColumnCapabilities 
snapshotColumnCapabilities(IncrementalIndexRowSelector selector, String column)
   {
-    IncrementalIndex.DimensionDesc desc = index.getDimension(column);
+    IncrementalIndex.DimensionDesc desc = selector.getDimension(column);
     // nested column indexer is a liar, and behaves like any type if it only 
processes unnested literals of a single
     // type, so force it to use nested column type
     if (desc != null && desc.getIndexer() instanceof 
NestedDataColumnIndexerV4) {
@@ -122,7 +122,7 @@ public class IncrementalIndexCursorFactory implements 
CursorFactory
     // multi-valuedness at cursor creation time, instead of the latest state, 
and getSnapshotColumnCapabilities could
     // be removed.
     return ColumnCapabilitiesImpl.snapshot(
-        index.getColumnCapabilities(column),
+        selector.getColumnCapabilities(column),
         COERCE_LOGIC
     );
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java
index 02c09398d8e..72ec9116d1f 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java
@@ -23,52 +23,46 @@ import com.google.common.collect.Iterators;
 import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.Order;
 import org.apache.druid.query.OrderBy;
-import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.ValueMatcher;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.Cursors;
-import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.filter.ValueMatchers;
-import org.joda.time.Interval;
 
-import javax.annotation.Nullable;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
 public class IncrementalIndexCursorHolder implements CursorHolder
 {
-  private final IncrementalIndex index;
+  private final IncrementalIndexRowSelector rowSelector;
   private final CursorBuildSpec spec;
   private final List<OrderBy> ordering;
 
   public IncrementalIndexCursorHolder(
-      IncrementalIndex index,
+      IncrementalIndexRowSelector rowSelector,
       CursorBuildSpec spec
   )
   {
-    this.index = index;
+    this.rowSelector = rowSelector;
     this.spec = spec;
-    if (index.timePosition == 0) {
+    List<OrderBy> ordering = rowSelector.getOrdering();
+    if (Cursors.getTimeOrdering(ordering) != Order.NONE) {
       if (Cursors.preferDescendingTimeOrdering(spec)) {
         this.ordering = Cursors.descendingTimeOrder();
       } else {
         this.ordering = Cursors.ascendingTimeOrder();
       }
     } else {
-      // In principle, we could report a sort order here for certain types of 
fact holders; for example the
-      // RollupFactsHolder would be sorted by dimensions. However, this is 
left for future work.
-      this.ordering = Collections.emptyList();
+      this.ordering = ordering;
     }
   }
 
   @Override
   public Cursor asCursor()
   {
-    if (index.isEmpty()) {
+    if (rowSelector.isEmpty()) {
       return null;
     }
 
@@ -76,13 +70,10 @@ public class IncrementalIndexCursorHolder implements 
CursorHolder
       spec.getQueryMetrics().vectorized(false);
     }
 
-
     return new IncrementalIndexCursor(
-        index,
-        spec.getVirtualColumns(),
-        Cursors.getTimeOrdering(ordering),
-        spec.getFilter(),
-        spec.getInterval()
+        rowSelector,
+        spec,
+        Cursors.getTimeOrdering(ordering)
     );
   }
 
@@ -94,11 +85,11 @@ public class IncrementalIndexCursorHolder implements 
CursorHolder
 
   static class IncrementalIndexCursor implements Cursor
   {
-    private IncrementalIndexRowHolder currEntry;
+    private final IncrementalIndexRowSelector rowSelector;
+    private final IncrementalIndexRowHolder currEntry;
     private final ColumnSelectorFactory columnSelectorFactory;
     private final ValueMatcher filterMatcher;
     private final int maxRowIndex;
-    private final IncrementalIndex.FactsHolder facts;
     private Iterator<IncrementalIndexRow> baseIter;
     private Iterable<IncrementalIndexRow> cursorIterable;
     private boolean emptyRange;
@@ -106,30 +97,31 @@ public class IncrementalIndexCursorHolder implements 
CursorHolder
     private boolean done;
 
     IncrementalIndexCursor(
-        IncrementalIndex index,
-        VirtualColumns virtualColumns,
-        Order timeOrder,
-        @Nullable Filter filter,
-        Interval actualInterval
+        IncrementalIndexRowSelector index,
+        CursorBuildSpec buildSpec,
+        Order timeOrder
     )
     {
       currEntry = new IncrementalIndexRowHolder();
-      columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(
-          index,
-          virtualColumns,
-          timeOrder,
-          currEntry
-      );
       // Set maxRowIndex before creating the filterMatcher. See 
https://github.com/apache/druid/pull/6340
       maxRowIndex = index.getLastRowIndex();
-      filterMatcher = filter == null ? ValueMatchers.allTrue() : 
filter.makeMatcher(columnSelectorFactory);
       numAdvanced = -1;
-      facts = index.getFacts();
-      cursorIterable = facts.timeRangeIterable(
+
+      rowSelector = index;
+      cursorIterable = rowSelector.getFacts().timeRangeIterable(
           timeOrder == Order.DESCENDING,
-          actualInterval.getStartMillis(),
-          actualInterval.getEndMillis()
+          buildSpec.getInterval().getStartMillis(),
+          buildSpec.getInterval().getEndMillis()
       );
+      columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(
+          rowSelector,
+          buildSpec.getVirtualColumns(),
+          timeOrder,
+          currEntry
+      );
+      filterMatcher = buildSpec.getFilter() == null
+                      ? ValueMatchers.allTrue()
+                      : 
buildSpec.getFilter().makeMatcher(columnSelectorFactory);
       emptyRange = !cursorIterable.iterator().hasNext();
 
       reset();
@@ -152,7 +144,7 @@ public class IncrementalIndexCursorHolder implements 
CursorHolder
       while (baseIter.hasNext()) {
         BaseQuery.checkInterrupted();
 
-        IncrementalIndexRow entry = baseIter.next();
+        final IncrementalIndexRow entry = baseIter.next();
         if (beyondMaxRowIndex(entry.getRowIndex())) {
           continue;
         }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
index 89e94961f6b..2e817b993ce 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
@@ -144,6 +144,8 @@ public final class IncrementalIndexRow
           {
             if (input == null || (input.getClass().isArray() && 
Array.getLength(input) == 0)) {
               return Collections.singletonList("null");
+            } else if (input instanceof int[]) {
+              return Arrays.toString((int[]) input);
             }
             return Collections.singletonList(input);
           }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java
new file mode 100644
index 00000000000..bafa127e881
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.segment.ColumnInspector;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * Interface that abstracts selecting data from a {@link FactsHolder}
+ */
+public interface IncrementalIndexRowSelector extends ColumnInspector
+{
+  /**
+   * get {@link IncrementalIndex.DimensionDesc} for the specified column, if 
available, which provides access to things
+   * like {@link org.apache.druid.segment.DimensionIndexer} and {@link 
org.apache.druid.segment.DimensionHandler} as
+   * well as column capabilities and position within the row
+   */
+  @Nullable
+  IncrementalIndex.DimensionDesc getDimension(String columnName);
+
+  /**
+   * Get {@link IncrementalIndex.MetricDesc} which provides column 
capabilities and position in the aggregators section
+   * of the row
+   */
+  @Nullable
+  IncrementalIndex.MetricDesc getMetric(String s);
+
+  /**
+   * Ordering for the data in the facts table
+   */
+  List<OrderBy> getOrdering();
+
+  /**
+   * Are there any {@link IncrementalIndexRow} stored in the {@link 
FactsHolder}?
+   */
+  boolean isEmpty();
+
+  /**
+   * Get the {@link FactsHolder} containing all of the {@link 
IncrementalIndexRow} backing this selector
+   */
+  FactsHolder getFacts();
+
+  /**
+   * Highest value {@link IncrementalIndexRow#getRowIndex()} available in this 
selector. Note that these values do not
+   * reflect the position of the row in the {@link FactsHolder}, rather just 
the order in which they were processed
+   */
+  int getLastRowIndex();
+
+  /**
+   * @param rowOffset row to get float aggregator value
+   * @param aggOffset position of the aggregator in the aggregators array of 
the data schema
+   * @return          float value of the metric
+   */
+  float getMetricFloatValue(int rowOffset, int aggOffset);
+
+  /**
+   * @param rowOffset row to get long aggregator value
+   * @param aggOffset position of the aggregator in the aggregators array of 
the data schema
+   * @return          long value of the aggregator for this row
+   */
+  long getMetricLongValue(int rowOffset, int aggOffset);
+
+  /**
+   * @param rowOffset row to get double aggregator value
+   * @param aggOffset position of the aggregator in the aggregators array of 
the data schema
+   * @return          double value of the aggregator for this row
+   */
+  double getMetricDoubleValue(int rowOffset, int aggOffset);
+
+  /**
+   * @param rowOffset row to get long aggregator value
+   * @param aggOffset position of the aggregator in the aggregators array of 
the data schema
+   * @return          long value of the aggregator for this row
+   */
+  @Nullable
+  Object getMetricObjectValue(int rowOffset, int aggOffset);
+
+  /**
+   * @param rowOffset row to check for a aggregator value
+   * @param aggOffset position of the aggregator in the aggregators array of 
the data schema
+   * @return          is the value null for this row?
+   */
+  boolean isNull(int rowOffset, int aggOffset);
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index b5e580f44f2..8c554e016fc 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -155,7 +155,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     } else {
       this.facts = new PlainNonTimeOrderedFactsHolder(dimsComparator());
     }
-    maxBytesPerRowForAggregators =
+    this.maxBytesPerRowForAggregators =
         useMaxMemoryEstimates ? 
getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0;
     this.useMaxMemoryEstimates = useMaxMemoryEstimates;
   }
@@ -252,14 +252,15 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
   ) throws IndexSizeExceededException
   {
     final List<String> parseExceptionMessages = new ArrayList<>();
+    final AtomicLong totalSizeInBytes = getBytesInMemory();
+
     final int priorIndex = facts.getPriorIndex(key);
 
     Aggregator[] aggs;
     final AggregatorFactory[] metrics = getMetrics();
     final AtomicInteger numEntries = getNumEntries();
-    final AtomicLong totalSizeInBytes = getBytesInMemory();
     if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
-      aggs = concurrentGet(priorIndex);
+      aggs = aggregators.get(priorIndex);
       long aggSizeDelta = doAggregate(metrics, aggs, inputRowHolder, 
parseExceptionMessages);
       totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta);
     } else {
@@ -272,7 +273,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
       aggSizeForRow += doAggregate(metrics, aggs, inputRowHolder, 
parseExceptionMessages);
 
       final int rowIndex = indexIncrement.getAndIncrement();
-      concurrentSet(rowIndex, aggs);
+      aggregators.put(rowIndex, aggs);
 
       // Last ditch sanity checks
       if ((numEntries.get() >= maxRowCount || totalSizeInBytes.get() >= 
maxBytesInMemory)
@@ -363,6 +364,18 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
       InputRowHolder inputRowHolder,
       List<String> parseExceptionsHolder
   )
+  {
+    return doAggregate(metrics, aggs, inputRowHolder, parseExceptionsHolder, 
useMaxMemoryEstimates, preserveExistingMetrics);
+  }
+
+  private static long doAggregate(
+      AggregatorFactory[] metrics,
+      Aggregator[] aggs,
+      InputRowHolder inputRowHolder,
+      List<String> parseExceptionsHolder,
+      boolean useMaxMemoryEstimates,
+      boolean preserveExistingMetrics
+  )
   {
     long totalIncrementalBytes = 0L;
     for (int i = 0; i < metrics.length; i++) {
@@ -418,17 +431,6 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
     }
   }
 
-  protected Aggregator[] concurrentGet(int offset)
-  {
-    // All get operations should be fine
-    return aggregators.get(offset);
-  }
-
-  protected void concurrentSet(int offset, Aggregator[] value)
-  {
-    aggregators.put(offset, value);
-  }
-
   @Override
   public boolean canAppendRow()
   {
@@ -459,42 +461,53 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
     return outOfRowsReason;
   }
 
-  protected Aggregator[] getAggsForRow(int rowOffset)
-  {
-    return concurrentGet(rowOffset);
-  }
-
   @Override
   public float getMetricFloatValue(int rowOffset, int aggOffset)
   {
-    return ((Number) getMetricHelper(getMetricAggs(), 
concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue();
+    return ((Number) getMetricHelper(
+        getMetricAggs(),
+        aggregators.get(rowOffset),
+        aggOffset,
+        Aggregator::getFloat
+    )).floatValue();
   }
 
   @Override
   public long getMetricLongValue(int rowOffset, int aggOffset)
   {
-    return ((Number) getMetricHelper(getMetricAggs(), 
concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue();
+    return ((Number) getMetricHelper(
+        getMetricAggs(),
+        aggregators.get(rowOffset),
+        aggOffset,
+        Aggregator::getLong
+    )).longValue();
   }
 
   @Override
-  public Object getMetricObjectValue(int rowOffset, int aggOffset)
+  public double getMetricDoubleValue(int rowOffset, int aggOffset)
   {
-    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), 
aggOffset, Aggregator::get);
+    return ((Number) getMetricHelper(
+        getMetricAggs(),
+        aggregators.get(rowOffset),
+        aggOffset,
+        Aggregator::getDouble
+    )).doubleValue();
   }
 
   @Override
-  protected double getMetricDoubleValue(int rowOffset, int aggOffset)
+  public Object getMetricObjectValue(int rowOffset, int aggOffset)
   {
-    return ((Number) getMetricHelper(getMetricAggs(), 
concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue();
+    return getMetricHelper(getMetricAggs(), aggregators.get(rowOffset), 
aggOffset, Aggregator::get);
   }
 
   @Override
   public boolean isNull(int rowOffset, int aggOffset)
   {
+    final Aggregator[] aggs = aggregators.get(rowOffset);
     if (preserveExistingMetrics) {
-      return concurrentGet(rowOffset)[aggOffset].isNull() && 
concurrentGet(rowOffset)[aggOffset + getMetricAggs().length].isNull();
+      return aggs[aggOffset].isNull() && aggs[aggOffset + 
getMetricAggs().length].isNull();
     } else {
-      return concurrentGet(rowOffset)[aggOffset].isNull();
+      return aggs[aggOffset].isNull();
     }
   }
 
@@ -535,7 +548,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
                 theVals.put(dimensionName, rowVals);
               }
 
-              Aggregator[] aggs = getAggsForRow(rowOffset);
+              Aggregator[] aggs = aggregators.get(rowOffset);
               int aggLength = preserveExistingMetrics ? aggs.length / 2 : 
aggs.length;
               for (int i = 0; i < aggLength; ++i) {
                 theVals.put(metrics[i].getName(), getMetricHelper(metrics, 
aggs, i, Aggregator::get));
@@ -560,11 +573,16 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
    * for aggregating from input into output field and the aggregator for 
combining already aggregated field, as needed
    */
   @Nullable
-  private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] 
aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
+  private <T> Object getMetricHelper(
+      AggregatorFactory[] metrics,
+      Aggregator[] aggs,
+      int aggOffset,
+      Function<Aggregator, T> getMetricTypeFunction
+  )
   {
     if (preserveExistingMetrics) {
-      // Since the preserveExistingMetrics flag is set, we will have to check 
and possibly retrieve the aggregated values
-      // from two aggregators, the aggregator for aggregating from input into 
output field and the aggregator
+      // Since the preserveExistingMetrics flag is set, we will have to check 
and possibly retrieve the aggregated
+      // values from two aggregators, the aggregator for aggregating from 
input into output field and the aggregator
       // for combining already aggregated field
       if (aggs[aggOffset].isNull()) {
         // If the aggregator for aggregating from input into output field is 
null, then we get the value from the
@@ -583,8 +601,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex
         return aggregatorFactory.combine(aggregatedFromSource, 
aggregatedFromCombined);
       }
     } else {
-      // If preserveExistingMetrics flag is not set then we simply get metrics 
from the list of Aggregator, aggs, using the
-      // given aggOffset
+      // If preserveExistingMetrics flag is not set then we simply get metrics 
from the list of Aggregator, aggs,
+      // using the given aggOffset
       return getMetricTypeFunction.apply(aggs[aggOffset]);
     }
   }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
index ae748198aea..baac335f0c4 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
@@ -26,10 +26,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.FrameType;
 import org.apache.druid.frame.testutil.FrameSequenceBuilder;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.CursorFactory;
-import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@@ -48,15 +45,11 @@ public final class TestFrameProcessorUtils
   {
     final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
-            new IncrementalIndexSchema(
-                0,
-                new TimestampSpec("__time", "millis", null),
-                Granularities.NONE,
-                VirtualColumns.EMPTY,
-                DimensionsSpec.builder().useSchemaDiscovery(true).build(),
-                new AggregatorFactory[0],
-                false
-            )
+            IncrementalIndexSchema.builder()
+                                  .withTimestampSpec(new 
TimestampSpec("__time", "millis", null))
+                                  
.withDimensionsSpec(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+                                  .withRollup(false)
+                                  .build()
         )
         .setMaxRowCount(1000)
         .build();
diff --git 
a/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
index 80b6d23d4b8..2c570981f65 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
@@ -26,8 +26,6 @@ import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.guice.BuiltInTypesModule;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.segment.column.ColumnType;
@@ -494,18 +492,17 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
     long minTimestamp = System.currentTimeMillis();
     IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
-            new IncrementalIndexSchema(
-                minTimestamp,
-                new TimestampSpec(TIME_COL, "millis", null),
-                Granularities.NONE,
-                VirtualColumns.EMPTY,
-                DimensionsSpec.builder()
-                              .setDimensions(ImmutableList.of(new 
AutoTypeColumnSchema(NESTED_COL, ColumnType.STRING)))
-                              .useSchemaDiscovery(true)
-                              .build(),
-                new AggregatorFactory[0],
-                false
-            )
+            IncrementalIndexSchema.builder()
+                                  .withMinTimestamp(minTimestamp)
+                                  .withTimestampSpec(new 
TimestampSpec(TIME_COL, "millis", null))
+                                  .withDimensionsSpec(
+                                      DimensionsSpec.builder()
+                                                    
.setDimensions(ImmutableList.of(new AutoTypeColumnSchema(NESTED_COL, 
ColumnType.STRING)))
+                                                    .useSchemaDiscovery(true)
+                                                    .build()
+                                  )
+                                  .withRollup(false)
+                                  .build()
         )
         .setMaxRowCount(1000)
         .build();
@@ -699,15 +696,16 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
   {
     IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
-            new IncrementalIndexSchema(
-                minTimestamp,
-                new TimestampSpec(TIME_COL, "millis", null),
-                Granularities.NONE,
-                VirtualColumns.EMPTY,
-                DimensionsSpec.builder().useSchemaDiscovery(true).build(),
-                new AggregatorFactory[0],
-                false
-            )
+            IncrementalIndexSchema.builder()
+                                  .withMinTimestamp(minTimestamp)
+                                  .withTimestampSpec(new 
TimestampSpec(TIME_COL, "millis", null))
+                                  .withDimensionsSpec(
+                                      DimensionsSpec.builder()
+                                                    .useSchemaDiscovery(true)
+                                                    .build()
+                                  )
+                                  .withRollup(false)
+                                  .build()
         )
         .setMaxRowCount(1000)
         .build();
diff --git 
a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java
 
b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java
index 2e9deab42b4..9fc9fc0f578 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java
@@ -26,8 +26,6 @@ import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.guice.BuiltInTypesModule;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.segment.column.ColumnType;
@@ -478,15 +476,16 @@ public class NestedDataColumnIndexerV4Test extends 
InitializedNullHandlingTest
   {
     IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
-            new IncrementalIndexSchema(
-                minTimestamp,
-                new TimestampSpec(TIME_COL, "millis", null),
-                Granularities.NONE,
-                VirtualColumns.EMPTY,
-                DimensionsSpec.builder().useSchemaDiscovery(true).build(),
-                new AggregatorFactory[0],
-                false
-            )
+            IncrementalIndexSchema.builder()
+                                  .withMinTimestamp(minTimestamp)
+                                  .withTimestampSpec(new 
TimestampSpec(TIME_COL, "millis", null))
+                                  .withDimensionsSpec(
+                                      DimensionsSpec.builder()
+                                                    .useSchemaDiscovery(true)
+                                                    .build()
+                                  )
+                                  .withRollup(false)
+                                  .build()
         )
         .setMaxRowCount(1000)
         .build();
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
index 77e0470c548..e1a7319cab1 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
@@ -25,9 +25,12 @@ import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.guice.BuiltInTypesModule;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorAndSize;
 import org.apache.druid.query.aggregation.LongMaxAggregator;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.segment.CloserRule;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.easymock.EasyMock;
 import org.junit.Rule;
@@ -69,22 +72,39 @@ public class IncrementalIndexIngestionTest extends 
InitializedNullHandlingTest
   {
     // Prepare the mocks & set close() call count expectation to 1
     Aggregator mockedAggregator = EasyMock.createMock(LongMaxAggregator.class);
+    
EasyMock.expect(mockedAggregator.aggregateWithSize()).andReturn(0L).anyTimes();
     mockedAggregator.close();
     EasyMock.expectLastCall().times(1);
 
-    final IncrementalIndex genericIndex = indexCreator.createIndex(
+
+    EasyMock.replay(mockedAggregator);
+
+    final IncrementalIndex incrementalIndex = indexCreator.createIndex(
         new IncrementalIndexSchema.Builder()
             .withQueryGranularity(Granularities.MINUTE)
-            .withMetrics(new LongMaxAggregatorFactory("max", "max"))
+            .withMetrics(new LongMaxAggregatorFactory("max", "max")
+            {
+              @Override
+              protected Aggregator factorize(ColumnSelectorFactory 
metricFactory, ColumnValueSelector selector)
+              {
+                return mockedAggregator;
+              }
+
+              @Override
+              public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory 
metricFactory)
+              {
+                return new AggregatorAndSize(mockedAggregator, Long.BYTES);
+              }
+            })
             .build()
     );
 
     // This test is specific to the on-heap index
-    if (!(genericIndex instanceof OnheapIncrementalIndex)) {
+    if (!(incrementalIndex instanceof OnheapIncrementalIndex)) {
       return;
     }
 
-    final OnheapIncrementalIndex index = (OnheapIncrementalIndex) genericIndex;
+    final OnheapIncrementalIndex index = (OnheapIncrementalIndex) 
incrementalIndex;
 
     index.add(new MapBasedInputRow(
             0,
@@ -92,11 +112,7 @@ public class IncrementalIndexIngestionTest extends 
InitializedNullHandlingTest
             ImmutableMap.of("billy", 1, "max", 1)
     ));
 
-    // override the aggregators with the mocks
-    index.concurrentGet(0)[0] = mockedAggregator;
-
     // close the indexer and validate the expectations
-    EasyMock.replay(mockedAggregator);
     index.close();
     EasyMock.verify(mockedAggregator);
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
index 80c8207ed60..f5779bf7362 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
@@ -28,10 +28,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.guice.BuiltInTypesModule;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.CloserRule;
-import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -80,15 +77,11 @@ public class IncrementalIndexMultiValueSpecTest extends 
InitializedNullHandlingT
             new StringDimensionSchema("string3", 
DimensionSchema.MultiValueHandling.SORTED_SET, true)
         )
     );
-    IncrementalIndexSchema schema = new IncrementalIndexSchema(
-        0,
-        new TimestampSpec("ds", "auto", null),
-        Granularities.ALL,
-        VirtualColumns.EMPTY,
-        dimensionsSpec,
-        new AggregatorFactory[0],
-        false
-    );
+    IncrementalIndexSchema schema = IncrementalIndexSchema.builder()
+                                                          
.withTimestampSpec(new TimestampSpec("ds", "auto", null))
+                                                          
.withDimensionsSpec(dimensionsSpec)
+                                                          .withRollup(false)
+                                                          .build();
     Map<String, Object> map = new HashMap<String, Object>()
     {
       @Override
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
index 632a848830d..a3060f078a2 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.guava.SettableSupplier;
 import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -50,7 +49,6 @@ import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexCursorFactory;
 import org.apache.druid.segment.TestObjectColumnSelector;
-import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnType;
@@ -620,15 +618,10 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
     // underlying dimension selector.
     // This occurred during schemaless ingestion with spare dimension values 
and no explicit null rows, so the
     // conditions are replicated by this test. See 
https://github.com/apache/druid/pull/10248 for details
-    IncrementalIndexSchema schema = new IncrementalIndexSchema(
-        0,
-        new TimestampSpec("time", "millis", DateTimes.nowUtc()),
-        Granularities.NONE,
-        VirtualColumns.EMPTY,
-        DimensionsSpec.EMPTY,
-        new AggregatorFactory[]{new CountAggregatorFactory("count")},
-        true
-    );
+    IncrementalIndexSchema schema = IncrementalIndexSchema.builder()
+                                                          
.withTimestampSpec(new TimestampSpec("time", "millis", DateTimes.nowUtc()))
+                                                          .withMetrics(new 
AggregatorFactory[]{new CountAggregatorFactory("count")})
+                                                          .build();
 
     IncrementalIndex index = new 
OnheapIncrementalIndex.Builder().setMaxRowCount(100).setIndexSchema(schema).build();
     index.add(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to