This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 73a644258d6 abstract `IncrementalIndex` cursor stuff to prepare for
using different "views" of the data based on the cursor build spec (#17064)
73a644258d6 is described below
commit 73a644258d69c05baac127c17de90b23e0f79f5e
Author: Clint Wylie <[email protected]>
AuthorDate: Sun Sep 15 16:45:51 2024 -0700
abstract `IncrementalIndex` cursor stuff to prepare for using different
"views" of the data based on the cursor build spec (#17064)
* abstract `IncrementalIndex` cursor stuff to prepare to allow for
possibility of using different "views" of the data based on the cursor build
spec
changes:
* introduce `IncrementalIndexRowSelector` interface to capture how
`IncrementalIndexCursor` and `IncrementalIndexColumnSelectorFactory` read data
* `IncrementalIndex` implements `IncrementalIndexRowSelector`
* move `FactsHolder` interface to separate file
* other minor refactorings
---
.../druid/segment/incremental/FactsHolder.java | 82 ++++
.../segment/incremental/IncrementalIndex.java | 453 ++++++++++-----------
.../IncrementalIndexColumnSelectorFactory.java | 22 +-
.../incremental/IncrementalIndexCursorFactory.java | 6 +-
.../incremental/IncrementalIndexCursorHolder.java | 68 ++--
.../segment/incremental/IncrementalIndexRow.java | 2 +
.../incremental/IncrementalIndexRowSelector.java | 104 +++++
.../incremental/OnheapIncrementalIndex.java | 86 ++--
.../processor/test/TestFrameProcessorUtils.java | 17 +-
.../druid/segment/AutoTypeColumnIndexerTest.java | 44 +-
.../segment/NestedDataColumnIndexerV4Test.java | 21 +-
.../incremental/IncrementalIndexIngestionTest.java | 32 +-
.../IncrementalIndexMultiValueSpecTest.java | 17 +-
.../segment/virtual/ExpressionSelectorsTest.java | 15 +-
14 files changed, 572 insertions(+), 397 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java
b/processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java
new file mode 100644
index 00000000000..f7eede10150
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/FactsHolder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import java.util.Comparator;
+import java.util.Iterator;
+
+/**
+ * {@link IncrementalIndexRow} storage interface, a mutable data structure for
building up a set or rows to eventually
+ * persist into an immutable segment
+ *
+ * @see IncrementalIndex for the data processor which constructs {@link
IncrementalIndexRow} to store here
+ */
+public interface FactsHolder
+{
+ /**
+ * @return the previous rowIndex associated with the specified key, or
+ * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for
the key.
+ */
+ int getPriorIndex(IncrementalIndexRow key);
+
+ /**
+ * Get minimum {@link IncrementalIndexRow#getTimestamp()} present in the
facts holder
+ */
+ long getMinTimeMillis();
+
+ /**
+ * Get maximum {@link IncrementalIndexRow#getTimestamp()} present in the
facts holder
+ */
+ long getMaxTimeMillis();
+
+ /**
+ * Get all {@link IncrementalIndex}, depending on the implementation, these
rows may or may not be ordered in the same
+ * order they will be persisted in. Use {@link #persistIterable()} if this
is required.
+ */
+ Iterator<IncrementalIndexRow> iterator(boolean descending);
+
+ /**
+ * Get all {@link IncrementalIndexRow} with {@link
IncrementalIndexRow#getTimestamp()} between the start and end
+ * timestamps specified
+ */
+ Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long
timeStart, long timeEnd);
+
+ /**
+ * Get all row {@link IncrementalIndexRow} 'keys', which is distinct groups
if this is an aggregating facts holder or
+ * just every row present if not
+ */
+ Iterable<IncrementalIndexRow> keySet();
+
+ /**
+ * Get all {@link IncrementalIndexRow} to persist, ordered with {@link
Comparator <IncrementalIndexRow>}
+ */
+ Iterable<IncrementalIndexRow> persistIterable();
+
+ /**
+ * @return the previous rowIndex associated with the specified key, or
+ * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for
the key.
+ */
+ int putIfAbsent(IncrementalIndexRow key, int rowIndex);
+
+ /**
+ * Clear all rows present in the facts holder
+ */
+ void clear();
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index fc2a02c47b7..8adc47f6533 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -105,7 +105,7 @@ import java.util.stream.Collectors;
* {@link IncrementalIndexCursorFactory} are thread-safe, and may be called
concurrently with each other, and with
* the "add" methods. This concurrency model supports real-time queries of the
data in the index.
*/
-public abstract class IncrementalIndex implements Iterable<Row>, Closeable,
ColumnInspector
+public abstract class IncrementalIndex implements IncrementalIndexRowSelector,
ColumnInspector, Iterable<Row>, Closeable
{
/**
* Column selector used at ingestion time for inputs to aggregators.
@@ -255,8 +255,9 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
private final boolean useSchemaDiscovery;
- private final InputRowHolder inputRowHolder = new InputRowHolder();
+ protected final InputRowHolder inputRowHolder = new InputRowHolder();
+ @Nullable
private volatile DateTime maxIngestedEventTime;
/**
@@ -366,8 +367,6 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
);
}
- public abstract FactsHolder getFacts();
-
public abstract boolean canAppendRow();
public abstract String getOutOfRowsReason();
@@ -384,100 +383,11 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
boolean skipMaxRowsInMemoryCheck
) throws IndexSizeExceededException;
- public abstract int getLastRowIndex();
-
- protected abstract float getMetricFloatValue(int rowOffset, int aggOffset);
-
- protected abstract long getMetricLongValue(int rowOffset, int aggOffset);
-
- protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset);
-
- protected abstract double getMetricDoubleValue(int rowOffset, int aggOffset);
-
- protected abstract boolean isNull(int rowOffset, int aggOffset);
-
- static class IncrementalIndexRowResult
- {
- private final IncrementalIndexRow incrementalIndexRow;
- private final List<String> parseExceptionMessages;
-
- IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow,
List<String> parseExceptionMessages)
- {
- this.incrementalIndexRow = incrementalIndexRow;
- this.parseExceptionMessages = parseExceptionMessages;
- }
-
- IncrementalIndexRow getIncrementalIndexRow()
- {
- return incrementalIndexRow;
- }
-
- List<String> getParseExceptionMessages()
- {
- return parseExceptionMessages;
- }
- }
-
- static class AddToFactsResult
- {
- private final int rowCount;
- private final long bytesInMemory;
- private final List<String> parseExceptionMessages;
-
- public AddToFactsResult(
- int rowCount,
- long bytesInMemory,
- List<String> parseExceptionMessages
- )
- {
- this.rowCount = rowCount;
- this.bytesInMemory = bytesInMemory;
- this.parseExceptionMessages = parseExceptionMessages;
- }
-
- int getRowCount()
- {
- return rowCount;
- }
-
- public long getBytesInMemory()
- {
- return bytesInMemory;
- }
-
- public List<String> getParseExceptionMessages()
- {
- return parseExceptionMessages;
- }
- }
-
- public static class InputRowHolder
- {
- @Nullable
- private InputRow row;
- private long rowId = -1;
-
- public void set(final InputRow row)
- {
- this.row = row;
- this.rowId++;
- }
- public void unset()
- {
- this.row = null;
- }
-
- public InputRow getRow()
- {
- return Preconditions.checkNotNull(row, "row");
- }
-
- public long getRowId()
- {
- return rowId;
- }
- }
+ public abstract Iterable<Row> iterableWithPostAggregations(
+ @Nullable List<PostAggregator> postAggs,
+ boolean descending
+ );
public boolean isRollup()
{
@@ -746,23 +656,6 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
);
}
- private static String getSimplifiedEventStringFromRow(InputRow inputRow)
- {
- if (inputRow instanceof MapBasedInputRow) {
- return ((MapBasedInputRow) inputRow).getEvent().toString();
- }
-
- if (inputRow instanceof ListBasedInputRow) {
- return ((ListBasedInputRow) inputRow).asMap().toString();
- }
-
- if (inputRow instanceof TransformedInputRow) {
- InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow();
- return getSimplifiedEventStringFromRow(innerRow);
- }
-
- return inputRow.toString();
- }
private synchronized void updateMaxIngestedTime(DateTime eventTime)
{
@@ -771,6 +664,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
}
}
+ @Override
public boolean isEmpty()
{
return numEntries.get() == 0;
@@ -861,6 +755,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
/**
* Returns the descriptor for a particular dimension.
*/
+ @Override
@Nullable
public DimensionDesc getDimension(String dimension)
{
@@ -869,22 +764,39 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
}
}
- public ColumnValueSelector<?> makeMetricColumnValueSelector(String metric,
IncrementalIndexRowHolder currEntry)
+ @Override
+ @Nullable
+ public MetricDesc getMetric(String metric)
{
- MetricDesc metricDesc = metricDescs.get(metric);
+ return metricDescs.get(metric);
+ }
+
+ @Override
+ public List<OrderBy> getOrdering()
+ {
+ return metadata.getOrdering();
+ }
+
+ public static ColumnValueSelector<?> makeMetricColumnValueSelector(
+ IncrementalIndexRowSelector rowSelector,
+ IncrementalIndexRowHolder currEntry,
+ String metric
+ )
+ {
+ final MetricDesc metricDesc = rowSelector.getMetric(metric);
if (metricDesc == null) {
return NilColumnValueSelector.instance();
}
int metricIndex = metricDesc.getIndex();
switch (metricDesc.getCapabilities().getType()) {
case COMPLEX:
- return new ObjectMetricColumnSelector(metricDesc, currEntry,
metricIndex);
+ return new ObjectMetricColumnSelector(rowSelector, currEntry,
metricDesc);
case LONG:
- return new LongMetricColumnSelector(currEntry, metricIndex);
+ return new LongMetricColumnSelector(rowSelector, currEntry,
metricIndex);
case FLOAT:
- return new FloatMetricColumnSelector(currEntry, metricIndex);
+ return new FloatMetricColumnSelector(rowSelector, currEntry,
metricIndex);
case DOUBLE:
- return new DoubleMetricColumnSelector(currEntry, metricIndex);
+ return new DoubleMetricColumnSelector(rowSelector, currEntry,
metricIndex);
case STRING:
throw new IllegalStateException("String is not a metric column type");
default:
@@ -910,13 +822,6 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
return isEmpty() ? null : DateTimes.utc(getMaxTimeMillis());
}
- @Nullable
- public Integer getDimensionIndex(String dimension)
- {
- DimensionDesc dimSpec = getDimension(dimension);
- return dimSpec == null ? null : dimSpec.getIndex();
- }
-
/**
* Returns names of time and dimension columns, in persist sort order.
Includes {@link ColumnHolder#TIME_COLUMN_NAME}.
*/
@@ -1003,6 +908,49 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
return metadata;
}
+ @Override
+ public Iterator<Row> iterator()
+ {
+ return iterableWithPostAggregations(null, false).iterator();
+ }
+
+ public DateTime getMaxIngestedEventTime()
+ {
+ return maxIngestedEventTime;
+ }
+
+ protected ColumnSelectorFactory makeColumnSelectorFactory(
+ @Nullable final AggregatorFactory agg,
+ final InputRowHolder in
+ )
+ {
+ return makeColumnSelectorFactory(virtualColumns, in, agg);
+ }
+
+ protected final Comparator<IncrementalIndexRow> dimsComparator()
+ {
+ return new IncrementalIndexRowComparator(timePosition, dimensionDescsList);
+ }
+
+
+ private static String getSimplifiedEventStringFromRow(InputRow inputRow)
+ {
+ if (inputRow instanceof MapBasedInputRow) {
+ return ((MapBasedInputRow) inputRow).getEvent().toString();
+ }
+
+ if (inputRow instanceof ListBasedInputRow) {
+ return ((ListBasedInputRow) inputRow).asMap().toString();
+ }
+
+ if (inputRow instanceof TransformedInputRow) {
+ InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow();
+ return getSimplifiedEventStringFromRow(innerRow);
+ }
+
+ return inputRow.toString();
+ }
+
private static AggregatorFactory[]
getCombiningAggregators(AggregatorFactory[] aggregators)
{
AggregatorFactory[] combiningAggregators = new
AggregatorFactory[aggregators.length];
@@ -1012,30 +960,24 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
return combiningAggregators;
}
- @Override
- public Iterator<Row> iterator()
- {
- return iterableWithPostAggregations(null, false).iterator();
- }
-
- public abstract Iterable<Row> iterableWithPostAggregations(
- @Nullable List<PostAggregator> postAggs,
- boolean descending
- );
-
- public DateTime getMaxIngestedEventTime()
+ private static boolean allNull(Object[] dims, int startPosition)
{
- return maxIngestedEventTime;
+ for (int i = startPosition; i < dims.length; i++) {
+ if (dims[i] != null) {
+ return false;
+ }
+ }
+ return true;
}
public static final class DimensionDesc
{
private final int index;
private final String name;
- private final DimensionHandler handler;
- private final DimensionIndexer indexer;
+ private final DimensionHandler<?, ?, ?> handler;
+ private final DimensionIndexer<?, ?, ?> indexer;
- public DimensionDesc(int index, String name, DimensionHandler handler,
boolean useMaxMemoryEstimates)
+ public DimensionDesc(int index, String name, DimensionHandler<?, ?, ?>
handler, boolean useMaxMemoryEstimates)
{
this.index = index;
this.name = name;
@@ -1058,12 +1000,12 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
return indexer.getColumnCapabilities();
}
- public DimensionHandler getHandler()
+ public DimensionHandler<?, ?, ?> getHandler()
{
return handler;
}
- public DimensionIndexer getIndexer()
+ public DimensionIndexer<?, ?, ?> getIndexer()
{
return indexer;
}
@@ -1124,19 +1066,90 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
}
}
- protected ColumnSelectorFactory makeColumnSelectorFactory(
- @Nullable final AggregatorFactory agg,
- final InputRowHolder in
- )
+ public static class AddToFactsResult
{
- return makeColumnSelectorFactory(virtualColumns, in, agg);
+ private final int rowCount;
+ private final long bytesInMemory;
+ private final List<String> parseExceptionMessages;
+
+ public AddToFactsResult(
+ int rowCount,
+ long bytesInMemory,
+ List<String> parseExceptionMessages
+ )
+ {
+ this.rowCount = rowCount;
+ this.bytesInMemory = bytesInMemory;
+ this.parseExceptionMessages = parseExceptionMessages;
+ }
+
+ int getRowCount()
+ {
+ return rowCount;
+ }
+
+ public long getBytesInMemory()
+ {
+ return bytesInMemory;
+ }
+
+ public List<String> getParseExceptionMessages()
+ {
+ return parseExceptionMessages;
+ }
}
- protected final Comparator<IncrementalIndexRow> dimsComparator()
+ public static class InputRowHolder
{
- return new IncrementalIndexRowComparator(timePosition, dimensionDescsList);
+ @Nullable
+ private InputRow row;
+ private long rowId = -1;
+
+ public void set(final InputRow row)
+ {
+ this.row = row;
+ this.rowId++;
+ }
+
+ public void unset()
+ {
+ this.row = null;
+ }
+
+ public InputRow getRow()
+ {
+ return Preconditions.checkNotNull(row, "row");
+ }
+
+ public long getRowId()
+ {
+ return rowId;
+ }
}
+ static class IncrementalIndexRowResult
+ {
+ private final IncrementalIndexRow incrementalIndexRow;
+ private final List<String> parseExceptionMessages;
+
+ IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow,
List<String> parseExceptionMessages)
+ {
+ this.incrementalIndexRow = incrementalIndexRow;
+ this.parseExceptionMessages = parseExceptionMessages;
+ }
+
+ IncrementalIndexRow getIncrementalIndexRow()
+ {
+ return incrementalIndexRow;
+ }
+
+ List<String> getParseExceptionMessages()
+ {
+ return parseExceptionMessages;
+ }
+ }
+
+
@VisibleForTesting
static final class IncrementalIndexRowComparator implements
Comparator<IncrementalIndexRow>
{
@@ -1207,57 +1220,19 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
}
}
- private static boolean allNull(Object[] dims, int startPosition)
- {
- for (int i = startPosition; i < dims.length; i++) {
- if (dims[i] != null) {
- return false;
- }
- }
- return true;
- }
-
- public interface FactsHolder
- {
- /**
- * @return the previous rowIndex associated with the specified key, or
- * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for
the key.
- */
- int getPriorIndex(IncrementalIndexRow key);
-
- long getMinTimeMillis();
-
- long getMaxTimeMillis();
-
- Iterator<IncrementalIndexRow> iterator(boolean descending);
-
- Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long
timeStart, long timeEnd);
-
- Iterable<IncrementalIndexRow> keySet();
-
- /**
- * Get all {@link IncrementalIndexRow} to persist, ordered with {@link
Comparator<IncrementalIndexRow>}
- *
- * @return
- */
- Iterable<IncrementalIndexRow> persistIterable();
-
- /**
- * @return the previous rowIndex associated with the specified key, or
- * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for
the key.
- */
- int putIfAbsent(IncrementalIndexRow key, int rowIndex);
-
- void clear();
- }
-
- private final class LongMetricColumnSelector implements LongColumnSelector
+ private static final class LongMetricColumnSelector implements
LongColumnSelector
{
+ private final IncrementalIndexRowSelector rowSelector;
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
- public LongMetricColumnSelector(IncrementalIndexRowHolder currEntry, int
metricIndex)
+ public LongMetricColumnSelector(
+ IncrementalIndexRowSelector rowSelector,
+ IncrementalIndexRowHolder currEntry,
+ int metricIndex
+ )
{
+ this.rowSelector = rowSelector;
this.currEntry = currEntry;
this.metricIndex = metricIndex;
}
@@ -1265,119 +1240,131 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
@Override
public long getLong()
{
- assert NullHandling.replaceWithDefault() || !isNull();
- return getMetricLongValue(currEntry.get().getRowIndex(), metricIndex);
+ return rowSelector.getMetricLongValue(currEntry.get().getRowIndex(),
metricIndex);
}
@Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ public boolean isNull()
{
- inspector.visit("index", IncrementalIndex.this);
+ return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex);
}
@Override
- public boolean isNull()
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
- return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(),
metricIndex);
+ inspector.visit("index", rowSelector);
}
}
- private final class ObjectMetricColumnSelector extends ObjectColumnSelector
+ private static final class FloatMetricColumnSelector implements
FloatColumnSelector
{
+ private final IncrementalIndexRowSelector rowSelector;
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
- private Class classOfObject;
- public ObjectMetricColumnSelector(
- MetricDesc metricDesc,
+ public FloatMetricColumnSelector(
+ IncrementalIndexRowSelector rowSelector,
IncrementalIndexRowHolder currEntry,
int metricIndex
)
{
this.currEntry = currEntry;
+ this.rowSelector = rowSelector;
this.metricIndex = metricIndex;
- classOfObject =
ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz();
}
- @Nullable
@Override
- public Object getObject()
+ public float getFloat()
{
- return getMetricObjectValue(currEntry.get().getRowIndex(), metricIndex);
+ return rowSelector.getMetricFloatValue(currEntry.get().getRowIndex(),
metricIndex);
}
@Override
- public Class classOfObject()
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
- return classOfObject;
+ inspector.visit("index", rowSelector);
}
@Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ public boolean isNull()
{
- inspector.visit("index", IncrementalIndex.this);
+ return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex);
}
}
- private final class FloatMetricColumnSelector implements FloatColumnSelector
+ private static final class DoubleMetricColumnSelector implements
DoubleColumnSelector
{
+ private final IncrementalIndexRowSelector rowSelector;
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
- public FloatMetricColumnSelector(IncrementalIndexRowHolder currEntry, int
metricIndex)
+ public DoubleMetricColumnSelector(
+ IncrementalIndexRowSelector rowSelector,
+ IncrementalIndexRowHolder currEntry,
+ int metricIndex
+ )
{
this.currEntry = currEntry;
+ this.rowSelector = rowSelector;
this.metricIndex = metricIndex;
}
@Override
- public float getFloat()
+ public double getDouble()
{
assert NullHandling.replaceWithDefault() || !isNull();
- return getMetricFloatValue(currEntry.get().getRowIndex(), metricIndex);
+ return rowSelector.getMetricDoubleValue(currEntry.get().getRowIndex(),
metricIndex);
}
@Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ public boolean isNull()
{
- inspector.visit("index", IncrementalIndex.this);
+ return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex);
}
@Override
- public boolean isNull()
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
- return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(),
metricIndex);
+ inspector.visit("index", rowSelector);
}
}
- private final class DoubleMetricColumnSelector implements
DoubleColumnSelector
+ private static final class ObjectMetricColumnSelector extends
ObjectColumnSelector
{
+ private final IncrementalIndexRowSelector rowSelector;
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
+ private final Class<?> classOfObject;
- public DoubleMetricColumnSelector(IncrementalIndexRowHolder currEntry, int
metricIndex)
+ public ObjectMetricColumnSelector(
+ IncrementalIndexRowSelector rowSelector,
+ IncrementalIndexRowHolder currEntry,
+ MetricDesc metricDesc
+ )
{
this.currEntry = currEntry;
- this.metricIndex = metricIndex;
+ this.rowSelector = rowSelector;
+ this.metricIndex = metricDesc.getIndex();
+ this.classOfObject =
ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz();
}
+ @Nullable
@Override
- public double getDouble()
+ public Object getObject()
{
- assert NullHandling.replaceWithDefault() || !isNull();
- return getMetricDoubleValue(currEntry.get().getRowIndex(), metricIndex);
+ return rowSelector.getMetricObjectValue(currEntry.get().getRowIndex(),
metricIndex);
}
@Override
- public boolean isNull()
+ public Class<?> classOfObject()
{
- return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(),
metricIndex);
+ return classOfObject;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
- inspector.visit("index", IncrementalIndex.this);
+ inspector.visit("index", rowSelector);
}
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java
index 86e8c6690c2..9d60edef044 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexColumnSelectorFactory.java
@@ -43,29 +43,29 @@ import javax.annotation.Nullable;
class IncrementalIndexColumnSelectorFactory implements ColumnSelectorFactory,
RowIdSupplier
{
private final ColumnInspector snapshotColumnInspector;
- private final IncrementalIndex index;
private final VirtualColumns virtualColumns;
private final Order timeOrder;
private final IncrementalIndexRowHolder rowHolder;
+ private final IncrementalIndexRowSelector rowSelector;
IncrementalIndexColumnSelectorFactory(
- IncrementalIndex index,
+ IncrementalIndexRowSelector rowSelector,
VirtualColumns virtualColumns,
Order timeOrder,
IncrementalIndexRowHolder rowHolder
)
{
- this.index = index;
this.virtualColumns = virtualColumns;
this.timeOrder = timeOrder;
this.rowHolder = rowHolder;
+ this.rowSelector = rowSelector;
this.snapshotColumnInspector = new ColumnInspector()
{
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
- return IncrementalIndexCursorFactory.snapshotColumnCapabilities(index,
column);
+ return
IncrementalIndexCursorFactory.snapshotColumnCapabilities(rowSelector, column);
}
};
}
@@ -87,13 +87,13 @@ class IncrementalIndexColumnSelectorFactory implements
ColumnSelectorFactory, Ro
if (dimension.equals(ColumnHolder.TIME_COLUMN_NAME) && timeOrder !=
Order.NONE) {
return new SingleScanTimeDimensionSelector(
- makeColumnValueSelector(dimension),
+ makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
extractionFn,
timeOrder
);
}
- final IncrementalIndex.DimensionDesc dimensionDesc =
index.getDimension(dimensionSpec.getDimension());
+ final IncrementalIndex.DimensionDesc dimensionDesc =
rowSelector.getDimension(dimensionSpec.getDimension());
if (dimensionDesc == null) {
// not a dimension, column may be a metric
ColumnCapabilities capabilities = getColumnCapabilities(dimension);
@@ -122,19 +122,17 @@ class IncrementalIndexColumnSelectorFactory implements
ColumnSelectorFactory, Ro
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeColumnValueSelector(columnName, this);
}
-
- if (columnName.equals(ColumnHolder.TIME_COLUMN_NAME)) {
+ if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
return rowHolder;
}
- final Integer dimIndex = index.getDimensionIndex(columnName);
- if (dimIndex != null) {
- final IncrementalIndex.DimensionDesc dimensionDesc =
index.getDimension(columnName);
+ final IncrementalIndex.DimensionDesc dimensionDesc =
rowSelector.getDimension(columnName);
+ if (dimensionDesc != null) {
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeColumnValueSelector(rowHolder, dimensionDesc);
}
- return index.makeMetricColumnValueSelector(columnName, rowHolder);
+ return IncrementalIndex.makeMetricColumnValueSelector(rowSelector,
rowHolder, columnName);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
index e034f820dfb..b73a7b682a3 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
@@ -99,9 +99,9 @@ public class IncrementalIndexCursorFactory implements
CursorFactory
return snapshotColumnCapabilities(index, column);
}
- static ColumnCapabilities snapshotColumnCapabilities(IncrementalIndex index,
String column)
+ static ColumnCapabilities
snapshotColumnCapabilities(IncrementalIndexRowSelector selector, String column)
{
- IncrementalIndex.DimensionDesc desc = index.getDimension(column);
+ IncrementalIndex.DimensionDesc desc = selector.getDimension(column);
// nested column indexer is a liar, and behaves like any type if it only
processes unnested literals of a single
// type, so force it to use nested column type
if (desc != null && desc.getIndexer() instanceof
NestedDataColumnIndexerV4) {
@@ -122,7 +122,7 @@ public class IncrementalIndexCursorFactory implements
CursorFactory
// multi-valuedness at cursor creation time, instead of the latest state,
and getSnapshotColumnCapabilities could
// be removed.
return ColumnCapabilitiesImpl.snapshot(
- index.getColumnCapabilities(column),
+ selector.getColumnCapabilities(column),
COERCE_LOGIC
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java
index 02c09398d8e..72ec9116d1f 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java
@@ -23,52 +23,46 @@ import com.google.common.collect.Iterators;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.Order;
import org.apache.druid.query.OrderBy;
-import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.Cursors;
-import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.filter.ValueMatchers;
-import org.joda.time.Interval;
-import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public class IncrementalIndexCursorHolder implements CursorHolder
{
- private final IncrementalIndex index;
+ private final IncrementalIndexRowSelector rowSelector;
private final CursorBuildSpec spec;
private final List<OrderBy> ordering;
public IncrementalIndexCursorHolder(
- IncrementalIndex index,
+ IncrementalIndexRowSelector rowSelector,
CursorBuildSpec spec
)
{
- this.index = index;
+ this.rowSelector = rowSelector;
this.spec = spec;
- if (index.timePosition == 0) {
+ List<OrderBy> ordering = rowSelector.getOrdering();
+ if (Cursors.getTimeOrdering(ordering) != Order.NONE) {
if (Cursors.preferDescendingTimeOrdering(spec)) {
this.ordering = Cursors.descendingTimeOrder();
} else {
this.ordering = Cursors.ascendingTimeOrder();
}
} else {
- // In principle, we could report a sort order here for certain types of
fact holders; for example the
- // RollupFactsHolder would be sorted by dimensions. However, this is
left for future work.
- this.ordering = Collections.emptyList();
+ this.ordering = ordering;
}
}
@Override
public Cursor asCursor()
{
- if (index.isEmpty()) {
+ if (rowSelector.isEmpty()) {
return null;
}
@@ -76,13 +70,10 @@ public class IncrementalIndexCursorHolder implements
CursorHolder
spec.getQueryMetrics().vectorized(false);
}
-
return new IncrementalIndexCursor(
- index,
- spec.getVirtualColumns(),
- Cursors.getTimeOrdering(ordering),
- spec.getFilter(),
- spec.getInterval()
+ rowSelector,
+ spec,
+ Cursors.getTimeOrdering(ordering)
);
}
@@ -94,11 +85,11 @@ public class IncrementalIndexCursorHolder implements
CursorHolder
static class IncrementalIndexCursor implements Cursor
{
- private IncrementalIndexRowHolder currEntry;
+ private final IncrementalIndexRowSelector rowSelector;
+ private final IncrementalIndexRowHolder currEntry;
private final ColumnSelectorFactory columnSelectorFactory;
private final ValueMatcher filterMatcher;
private final int maxRowIndex;
- private final IncrementalIndex.FactsHolder facts;
private Iterator<IncrementalIndexRow> baseIter;
private Iterable<IncrementalIndexRow> cursorIterable;
private boolean emptyRange;
@@ -106,30 +97,31 @@ public class IncrementalIndexCursorHolder implements
CursorHolder
private boolean done;
IncrementalIndexCursor(
- IncrementalIndex index,
- VirtualColumns virtualColumns,
- Order timeOrder,
- @Nullable Filter filter,
- Interval actualInterval
+ IncrementalIndexRowSelector index,
+ CursorBuildSpec buildSpec,
+ Order timeOrder
)
{
currEntry = new IncrementalIndexRowHolder();
- columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(
- index,
- virtualColumns,
- timeOrder,
- currEntry
- );
// Set maxRowIndex before creating the filterMatcher. See
https://github.com/apache/druid/pull/6340
maxRowIndex = index.getLastRowIndex();
- filterMatcher = filter == null ? ValueMatchers.allTrue() :
filter.makeMatcher(columnSelectorFactory);
numAdvanced = -1;
- facts = index.getFacts();
- cursorIterable = facts.timeRangeIterable(
+
+ rowSelector = index;
+ cursorIterable = rowSelector.getFacts().timeRangeIterable(
timeOrder == Order.DESCENDING,
- actualInterval.getStartMillis(),
- actualInterval.getEndMillis()
+ buildSpec.getInterval().getStartMillis(),
+ buildSpec.getInterval().getEndMillis()
);
+ columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(
+ rowSelector,
+ buildSpec.getVirtualColumns(),
+ timeOrder,
+ currEntry
+ );
+ filterMatcher = buildSpec.getFilter() == null
+ ? ValueMatchers.allTrue()
+ :
buildSpec.getFilter().makeMatcher(columnSelectorFactory);
emptyRange = !cursorIterable.iterator().hasNext();
reset();
@@ -152,7 +144,7 @@ public class IncrementalIndexCursorHolder implements
CursorHolder
while (baseIter.hasNext()) {
BaseQuery.checkInterrupted();
- IncrementalIndexRow entry = baseIter.next();
+ final IncrementalIndexRow entry = baseIter.next();
if (beyondMaxRowIndex(entry.getRowIndex())) {
continue;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
index 89e94961f6b..2e817b993ce 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
@@ -144,6 +144,8 @@ public final class IncrementalIndexRow
{
if (input == null || (input.getClass().isArray() &&
Array.getLength(input) == 0)) {
return Collections.singletonList("null");
+ } else if (input instanceof int[]) {
+ return Arrays.toString((int[]) input);
}
return Collections.singletonList(input);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java
new file mode 100644
index 00000000000..bafa127e881
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowSelector.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.segment.ColumnInspector;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * Interface that abstracts selecting data from a {@link FactsHolder}
+ */
+public interface IncrementalIndexRowSelector extends ColumnInspector
+{
+ /**
+ * get {@link IncrementalIndex.DimensionDesc} for the specified column, if
available, which provides access to things
+ * like {@link org.apache.druid.segment.DimensionIndexer} and {@link
org.apache.druid.segment.DimensionHandler} as
+ * well as column capabilities and position within the row
+ */
+ @Nullable
+ IncrementalIndex.DimensionDesc getDimension(String columnName);
+
+ /**
+ * Get {@link IncrementalIndex.MetricDesc} which provides column
capabilities and position in the aggregators section
+ * of the row
+ */
+ @Nullable
+ IncrementalIndex.MetricDesc getMetric(String s);
+
+ /**
+ * Ordering for the data in the facts table
+ */
+ List<OrderBy> getOrdering();
+
+ /**
+ * Are there any {@link IncrementalIndexRow} stored in the {@link
FactsHolder}?
+ */
+ boolean isEmpty();
+
+ /**
+ * Get the {@link FactsHolder} containing all of the {@link
IncrementalIndexRow} backing this selector
+ */
+ FactsHolder getFacts();
+
+ /**
+ * Highest value {@link IncrementalIndexRow#getRowIndex()} available in this
selector. Note that these values do not
+ * reflect the position of the row in the {@link FactsHolder}, rather just
the order in which they were processed
+ */
+ int getLastRowIndex();
+
+ /**
+ * @param rowOffset row to get float aggregator value
+ * @param aggOffset position of the aggregator in the aggregators array of
the data schema
+ * @return float value of the metric
+ */
+ float getMetricFloatValue(int rowOffset, int aggOffset);
+
+ /**
+ * @param rowOffset row to get long aggregator value
+ * @param aggOffset position of the aggregator in the aggregators array of
the data schema
+ * @return long value of the aggregator for this row
+ */
+ long getMetricLongValue(int rowOffset, int aggOffset);
+
+ /**
+ * @param rowOffset row to get double aggregator value
+ * @param aggOffset position of the aggregator in the aggregators array of
the data schema
+ * @return double value of the aggregator for this row
+ */
+ double getMetricDoubleValue(int rowOffset, int aggOffset);
+
+ /**
+ * @param rowOffset row to get long aggregator value
+ * @param aggOffset position of the aggregator in the aggregators array of
the data schema
+ * @return long value of the aggregator for this row
+ */
+ @Nullable
+ Object getMetricObjectValue(int rowOffset, int aggOffset);
+
+ /**
+ * @param rowOffset row to check for a aggregator value
+ * @param aggOffset position of the aggregator in the aggregators array of
the data schema
+ * @return is the value null for this row?
+ */
+ boolean isNull(int rowOffset, int aggOffset);
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index b5e580f44f2..8c554e016fc 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -155,7 +155,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
} else {
this.facts = new PlainNonTimeOrderedFactsHolder(dimsComparator());
}
- maxBytesPerRowForAggregators =
+ this.maxBytesPerRowForAggregators =
useMaxMemoryEstimates ?
getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0;
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
}
@@ -252,14 +252,15 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
) throws IndexSizeExceededException
{
final List<String> parseExceptionMessages = new ArrayList<>();
+ final AtomicLong totalSizeInBytes = getBytesInMemory();
+
final int priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs;
final AggregatorFactory[] metrics = getMetrics();
final AtomicInteger numEntries = getNumEntries();
- final AtomicLong totalSizeInBytes = getBytesInMemory();
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
- aggs = concurrentGet(priorIndex);
+ aggs = aggregators.get(priorIndex);
long aggSizeDelta = doAggregate(metrics, aggs, inputRowHolder,
parseExceptionMessages);
totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta);
} else {
@@ -272,7 +273,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
aggSizeForRow += doAggregate(metrics, aggs, inputRowHolder,
parseExceptionMessages);
final int rowIndex = indexIncrement.getAndIncrement();
- concurrentSet(rowIndex, aggs);
+ aggregators.put(rowIndex, aggs);
// Last ditch sanity checks
if ((numEntries.get() >= maxRowCount || totalSizeInBytes.get() >=
maxBytesInMemory)
@@ -363,6 +364,18 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
InputRowHolder inputRowHolder,
List<String> parseExceptionsHolder
)
+ {
+ return doAggregate(metrics, aggs, inputRowHolder, parseExceptionsHolder,
useMaxMemoryEstimates, preserveExistingMetrics);
+ }
+
+ private static long doAggregate(
+ AggregatorFactory[] metrics,
+ Aggregator[] aggs,
+ InputRowHolder inputRowHolder,
+ List<String> parseExceptionsHolder,
+ boolean useMaxMemoryEstimates,
+ boolean preserveExistingMetrics
+ )
{
long totalIncrementalBytes = 0L;
for (int i = 0; i < metrics.length; i++) {
@@ -418,17 +431,6 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
}
}
- protected Aggregator[] concurrentGet(int offset)
- {
- // All get operations should be fine
- return aggregators.get(offset);
- }
-
- protected void concurrentSet(int offset, Aggregator[] value)
- {
- aggregators.put(offset, value);
- }
-
@Override
public boolean canAppendRow()
{
@@ -459,42 +461,53 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
return outOfRowsReason;
}
- protected Aggregator[] getAggsForRow(int rowOffset)
- {
- return concurrentGet(rowOffset);
- }
-
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
- return ((Number) getMetricHelper(getMetricAggs(),
concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue();
+ return ((Number) getMetricHelper(
+ getMetricAggs(),
+ aggregators.get(rowOffset),
+ aggOffset,
+ Aggregator::getFloat
+ )).floatValue();
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
- return ((Number) getMetricHelper(getMetricAggs(),
concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue();
+ return ((Number) getMetricHelper(
+ getMetricAggs(),
+ aggregators.get(rowOffset),
+ aggOffset,
+ Aggregator::getLong
+ )).longValue();
}
@Override
- public Object getMetricObjectValue(int rowOffset, int aggOffset)
+ public double getMetricDoubleValue(int rowOffset, int aggOffset)
{
- return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset),
aggOffset, Aggregator::get);
+ return ((Number) getMetricHelper(
+ getMetricAggs(),
+ aggregators.get(rowOffset),
+ aggOffset,
+ Aggregator::getDouble
+ )).doubleValue();
}
@Override
- protected double getMetricDoubleValue(int rowOffset, int aggOffset)
+ public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
- return ((Number) getMetricHelper(getMetricAggs(),
concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue();
+ return getMetricHelper(getMetricAggs(), aggregators.get(rowOffset),
aggOffset, Aggregator::get);
}
@Override
public boolean isNull(int rowOffset, int aggOffset)
{
+ final Aggregator[] aggs = aggregators.get(rowOffset);
if (preserveExistingMetrics) {
- return concurrentGet(rowOffset)[aggOffset].isNull() &&
concurrentGet(rowOffset)[aggOffset + getMetricAggs().length].isNull();
+ return aggs[aggOffset].isNull() && aggs[aggOffset +
getMetricAggs().length].isNull();
} else {
- return concurrentGet(rowOffset)[aggOffset].isNull();
+ return aggs[aggOffset].isNull();
}
}
@@ -535,7 +548,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
theVals.put(dimensionName, rowVals);
}
- Aggregator[] aggs = getAggsForRow(rowOffset);
+ Aggregator[] aggs = aggregators.get(rowOffset);
int aggLength = preserveExistingMetrics ? aggs.length / 2 :
aggs.length;
for (int i = 0; i < aggLength; ++i) {
theVals.put(metrics[i].getName(), getMetricHelper(metrics,
aggs, i, Aggregator::get));
@@ -560,11 +573,16 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
* for aggregating from input into output field and the aggregator for
combining already aggregated field, as needed
*/
@Nullable
- private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[]
aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
+ private <T> Object getMetricHelper(
+ AggregatorFactory[] metrics,
+ Aggregator[] aggs,
+ int aggOffset,
+ Function<Aggregator, T> getMetricTypeFunction
+ )
{
if (preserveExistingMetrics) {
- // Since the preserveExistingMetrics flag is set, we will have to check
and possibly retrieve the aggregated values
- // from two aggregators, the aggregator for aggregating from input into
output field and the aggregator
+ // Since the preserveExistingMetrics flag is set, we will have to check
and possibly retrieve the aggregated
+ // values from two aggregators, the aggregator for aggregating from
input into output field and the aggregator
// for combining already aggregated field
if (aggs[aggOffset].isNull()) {
// If the aggregator for aggregating from input into output field is
null, then we get the value from the
@@ -583,8 +601,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex
return aggregatorFactory.combine(aggregatedFromSource,
aggregatedFromCombined);
}
} else {
- // If preserveExistingMetrics flag is not set then we simply get metrics
from the list of Aggregator, aggs, using the
- // given aggOffset
+ // If preserveExistingMetrics flag is not set then we simply get metrics
from the list of Aggregator, aggs,
+ // using the given aggOffset
return getMetricTypeFunction.apply(aggs[aggOffset]);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
index ae748198aea..baac335f0c4 100644
---
a/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
+++
b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
@@ -26,10 +26,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.CursorFactory;
-import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@@ -48,15 +45,11 @@ public final class TestFrameProcessorUtils
{
final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
- new IncrementalIndexSchema(
- 0,
- new TimestampSpec("__time", "millis", null),
- Granularities.NONE,
- VirtualColumns.EMPTY,
- DimensionsSpec.builder().useSchemaDiscovery(true).build(),
- new AggregatorFactory[0],
- false
- )
+ IncrementalIndexSchema.builder()
+ .withTimestampSpec(new
TimestampSpec("__time", "millis", null))
+
.withDimensionsSpec(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+ .withRollup(false)
+ .build()
)
.setMaxRowCount(1000)
.build();
diff --git
a/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
b/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
index 80b6d23d4b8..2c570981f65 100644
---
a/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
@@ -26,8 +26,6 @@ import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.BuiltInTypesModule;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
@@ -494,18 +492,17 @@ public class AutoTypeColumnIndexerTest extends
InitializedNullHandlingTest
long minTimestamp = System.currentTimeMillis();
IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
- new IncrementalIndexSchema(
- minTimestamp,
- new TimestampSpec(TIME_COL, "millis", null),
- Granularities.NONE,
- VirtualColumns.EMPTY,
- DimensionsSpec.builder()
- .setDimensions(ImmutableList.of(new
AutoTypeColumnSchema(NESTED_COL, ColumnType.STRING)))
- .useSchemaDiscovery(true)
- .build(),
- new AggregatorFactory[0],
- false
- )
+ IncrementalIndexSchema.builder()
+ .withMinTimestamp(minTimestamp)
+ .withTimestampSpec(new
TimestampSpec(TIME_COL, "millis", null))
+ .withDimensionsSpec(
+ DimensionsSpec.builder()
+
.setDimensions(ImmutableList.of(new AutoTypeColumnSchema(NESTED_COL,
ColumnType.STRING)))
+ .useSchemaDiscovery(true)
+ .build()
+ )
+ .withRollup(false)
+ .build()
)
.setMaxRowCount(1000)
.build();
@@ -699,15 +696,16 @@ public class AutoTypeColumnIndexerTest extends
InitializedNullHandlingTest
{
IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
- new IncrementalIndexSchema(
- minTimestamp,
- new TimestampSpec(TIME_COL, "millis", null),
- Granularities.NONE,
- VirtualColumns.EMPTY,
- DimensionsSpec.builder().useSchemaDiscovery(true).build(),
- new AggregatorFactory[0],
- false
- )
+ IncrementalIndexSchema.builder()
+ .withMinTimestamp(minTimestamp)
+ .withTimestampSpec(new
TimestampSpec(TIME_COL, "millis", null))
+ .withDimensionsSpec(
+ DimensionsSpec.builder()
+ .useSchemaDiscovery(true)
+ .build()
+ )
+ .withRollup(false)
+ .build()
)
.setMaxRowCount(1000)
.build();
diff --git
a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java
b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java
index 2e9deab42b4..9fc9fc0f578 100644
---
a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java
+++
b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java
@@ -26,8 +26,6 @@ import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.BuiltInTypesModule;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
@@ -478,15 +476,16 @@ public class NestedDataColumnIndexerV4Test extends
InitializedNullHandlingTest
{
IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
- new IncrementalIndexSchema(
- minTimestamp,
- new TimestampSpec(TIME_COL, "millis", null),
- Granularities.NONE,
- VirtualColumns.EMPTY,
- DimensionsSpec.builder().useSchemaDiscovery(true).build(),
- new AggregatorFactory[0],
- false
- )
+ IncrementalIndexSchema.builder()
+ .withMinTimestamp(minTimestamp)
+ .withTimestampSpec(new
TimestampSpec(TIME_COL, "millis", null))
+ .withDimensionsSpec(
+ DimensionsSpec.builder()
+ .useSchemaDiscovery(true)
+ .build()
+ )
+ .withRollup(false)
+ .build()
)
.setMaxRowCount(1000)
.build();
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
index 77e0470c548..e1a7319cab1 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
@@ -25,9 +25,12 @@ import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.LongMaxAggregator;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.segment.CloserRule;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Rule;
@@ -69,22 +72,39 @@ public class IncrementalIndexIngestionTest extends
InitializedNullHandlingTest
{
// Prepare the mocks & set close() call count expectation to 1
Aggregator mockedAggregator = EasyMock.createMock(LongMaxAggregator.class);
+
EasyMock.expect(mockedAggregator.aggregateWithSize()).andReturn(0L).anyTimes();
mockedAggregator.close();
EasyMock.expectLastCall().times(1);
- final IncrementalIndex genericIndex = indexCreator.createIndex(
+
+ EasyMock.replay(mockedAggregator);
+
+ final IncrementalIndex incrementalIndex = indexCreator.createIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.MINUTE)
- .withMetrics(new LongMaxAggregatorFactory("max", "max"))
+ .withMetrics(new LongMaxAggregatorFactory("max", "max")
+ {
+ @Override
+ protected Aggregator factorize(ColumnSelectorFactory
metricFactory, ColumnValueSelector selector)
+ {
+ return mockedAggregator;
+ }
+
+ @Override
+ public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory
metricFactory)
+ {
+ return new AggregatorAndSize(mockedAggregator, Long.BYTES);
+ }
+ })
.build()
);
// This test is specific to the on-heap index
- if (!(genericIndex instanceof OnheapIncrementalIndex)) {
+ if (!(incrementalIndex instanceof OnheapIncrementalIndex)) {
return;
}
- final OnheapIncrementalIndex index = (OnheapIncrementalIndex) genericIndex;
+ final OnheapIncrementalIndex index = (OnheapIncrementalIndex)
incrementalIndex;
index.add(new MapBasedInputRow(
0,
@@ -92,11 +112,7 @@ public class IncrementalIndexIngestionTest extends
InitializedNullHandlingTest
ImmutableMap.of("billy", 1, "max", 1)
));
- // override the aggregators with the mocks
- index.concurrentGet(0)[0] = mockedAggregator;
-
// close the indexer and validate the expectations
- EasyMock.replay(mockedAggregator);
index.close();
EasyMock.verify(mockedAggregator);
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
index 80c8207ed60..f5779bf7362 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
@@ -28,10 +28,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.BuiltInTypesModule;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.CloserRule;
-import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
@@ -80,15 +77,11 @@ public class IncrementalIndexMultiValueSpecTest extends
InitializedNullHandlingT
new StringDimensionSchema("string3",
DimensionSchema.MultiValueHandling.SORTED_SET, true)
)
);
- IncrementalIndexSchema schema = new IncrementalIndexSchema(
- 0,
- new TimestampSpec("ds", "auto", null),
- Granularities.ALL,
- VirtualColumns.EMPTY,
- dimensionsSpec,
- new AggregatorFactory[0],
- false
- );
+ IncrementalIndexSchema schema = IncrementalIndexSchema.builder()
+
.withTimestampSpec(new TimestampSpec("ds", "auto", null))
+
.withDimensionsSpec(dimensionsSpec)
+ .withRollup(false)
+ .build();
Map<String, Object> map = new HashMap<String, Object>()
{
@Override
diff --git
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
index 632a848830d..a3060f078a2 100644
---
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -50,7 +49,6 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.TestObjectColumnSelector;
-import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
@@ -620,15 +618,10 @@ public class ExpressionSelectorsTest extends
InitializedNullHandlingTest
// underlying dimension selector.
// This occurred during schemaless ingestion with spare dimension values
and no explicit null rows, so the
// conditions are replicated by this test. See
https://github.com/apache/druid/pull/10248 for details
- IncrementalIndexSchema schema = new IncrementalIndexSchema(
- 0,
- new TimestampSpec("time", "millis", DateTimes.nowUtc()),
- Granularities.NONE,
- VirtualColumns.EMPTY,
- DimensionsSpec.EMPTY,
- new AggregatorFactory[]{new CountAggregatorFactory("count")},
- true
- );
+ IncrementalIndexSchema schema = IncrementalIndexSchema.builder()
+
.withTimestampSpec(new TimestampSpec("time", "millis", DateTimes.nowUtc()))
+ .withMetrics(new
AggregatorFactory[]{new CountAggregatorFactory("count")})
+ .build();
IncrementalIndex index = new
OnheapIncrementalIndex.Builder().setMaxRowCount(100).setIndexSchema(schema).build();
index.add(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]