This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0516d0dae43 simplify IncrementalIndex since group-by v1 has been
removed (#15448)
0516d0dae43 is described below
commit 0516d0dae432058848806e49e71ac73333bf751c
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Nov 29 14:46:16 2023 -0800
simplify IncrementalIndex since group-by v1 has been removed (#15448)
---
.../benchmark/GroupByTypeInterfaceBenchmark.java | 1 -
.../IncrementalIndexRowTypeBenchmark.java | 1 -
.../druid/benchmark/query/GroupByBenchmark.java | 1 -
.../DistinctCountGroupByQueryTest.java | 1 -
.../org/apache/druid/indexer/InputRowSerde.java | 2 +-
.../incremental/AppendableIndexBuilder.java | 15 --
.../segment/incremental/IncrementalIndex.java | 284 +--------------------
.../segment/incremental/IncrementalIndexRow.java | 2 +-
.../incremental/OnheapIncrementalIndex.java | 249 +++++++++++++++---
.../query/aggregation/AggregationTestHelper.java | 8 -
.../aggregation/StringColumnAggregationTest.java | 1 -
.../query/aggregation/mean/SimpleTestIndex.java | 1 -
...GroupByLimitPushDownInsufficientBufferTest.java | 1 -
.../GroupByLimitPushDownMultiNodeMergeTest.java | 1 -
.../query/groupby/GroupByMultiSegmentTest.java | 1 -
.../groupby/GroupByQueryRunnerFactoryTest.java | 1 -
.../query/groupby/NestedQueryPushDownTest.java | 1 -
.../incremental/IncrementalIndexCreator.java | 7 +-
.../segment/incremental/IncrementalIndexTest.java | 9 +-
.../OnheapIncrementalIndexBenchmark.java | 11 +-
20 files changed, 234 insertions(+), 364 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 2d902c12163..0fe549834f0 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -392,7 +392,6 @@ public class GroupByTypeInterfaceBenchmark
{
return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
- .setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
.build();
}
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
index 9687a888d91..395f9d6d7c7 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
@@ -135,7 +135,6 @@ public class IncrementalIndexRowTypeBenchmark
{
return appendableIndexSpec.builder()
.setSimpleTestingIndexSchema(aggs)
- .setDeserializeComplexMetrics(false)
.setMaxRowCount(rowsPerSegment)
.build();
}
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index d355dd2d005..efb6bed6cb6 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -608,7 +608,6 @@ public class GroupByBenchmark
.withRollup(withRollup)
.build()
)
- .setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
.build();
}
diff --git
a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
index c7946255b07..eaef749c324 100644
---
a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
+++
b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
@@ -88,7 +88,6 @@ public class DistinctCountGroupByQueryTest extends
InitializedNullHandlingTest
.withMetrics(new CountAggregatorFactory("cnt"))
.build()
)
- .setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.build();
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
index 7be4da386d8..22b140222c7 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java
@@ -330,7 +330,7 @@ public class InputRowSerde
writeString(k, out);
try (Aggregator agg = aggFactory.factorize(
- IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY,
aggFactory, supplier, true)
+ IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY,
aggFactory, supplier)
)) {
try {
agg.aggregate();
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index aa739b3f744..777c4cc1fd7 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -29,9 +29,6 @@ public abstract class AppendableIndexBuilder
{
@Nullable
protected IncrementalIndexSchema incrementalIndexSchema = null;
- protected boolean deserializeComplexMetrics = true;
- protected boolean concurrentEventAdd = false;
- protected boolean sortFacts = true;
protected int maxRowCount = 0;
protected long maxBytesInMemory = 0;
// When set to true, for any row that already has metric (with the same name
defined in metricSpec),
@@ -88,18 +85,6 @@ public abstract class AppendableIndexBuilder
return this;
}
- public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean
deserializeComplexMetrics)
- {
- this.deserializeComplexMetrics = deserializeComplexMetrics;
- return this;
- }
-
- public AppendableIndexBuilder setConcurrentEventAdd(final boolean
concurrentEventAdd)
- {
- this.concurrentEventAdd = concurrentEventAdd;
- return this;
- }
-
public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
{
this.maxRowCount = maxRowCount;
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index db72ed37171..eca175267a7 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -25,7 +25,6 @@ import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
@@ -84,23 +83,15 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Comparator;
-import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Stream;
public abstract class IncrementalIndex implements Iterable<Row>, Closeable,
ColumnInspector
{
@@ -109,22 +100,19 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
*
* @param agg the aggregator
* @param in ingestion-time input row supplier
- * @param deserializeComplexMetrics whether complex objects should be
deserialized by a {@link ComplexMetricExtractor}
- *
* @return column selector factory
*/
public static ColumnSelectorFactory makeColumnSelectorFactory(
final VirtualColumns virtualColumns,
final AggregatorFactory agg,
- final Supplier<InputRow> in,
- final boolean deserializeComplexMetrics
+ final Supplier<InputRow> in
)
{
// we use RowSignature.empty() because ColumnInspector here should be the
InputRow schema, not the
// IncrementalIndex schema, because we are reading values from the InputRow
final RowBasedColumnSelectorFactory<InputRow> baseSelectorFactory =
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
- in::get,
+ in,
RowSignature.empty(),
true,
true
@@ -135,11 +123,9 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
@Override
public ColumnValueSelector<?> makeColumnValueSelector(final String
column)
{
- final boolean isComplexMetric =
agg.getIntermediateType().is(ValueType.COMPLEX);
-
final ColumnValueSelector selector =
baseSelectorFactory.makeColumnValueSelector(column);
- if (!isComplexMetric || !deserializeComplexMetrics) {
+ if (!agg.getIntermediateType().is(ValueType.COMPLEX)) {
return selector;
} else {
// Wrap selector in a special one that uses ComplexMetricSerde to
modify incoming objects.
@@ -226,7 +212,6 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
private final List<Function<InputRow, InputRow>> rowTransformers;
private final VirtualColumns virtualColumns;
private final AggregatorFactory[] metrics;
- private final boolean deserializeComplexMetrics;
private final Metadata metadata;
protected final boolean preserveExistingMetrics;
@@ -252,16 +237,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
/**
- * Setting deserializeComplexMetrics to false is necessary for intermediate
aggregation such as groupBy that
- * should not deserialize input columns using ComplexMetricSerde for
aggregators that return complex metrics.
- * <p>
- * Set concurrentEventAdd to true to indicate that adding of input row
should be thread-safe (for example, groupBy
- * where the multiple threads can add concurrently to the IncrementalIndex).
- *
* @param incrementalIndexSchema the schema to use for incremental index
- * @param deserializeComplexMetrics flag whether or not to call
ComplexMetricExtractor.extractValue() on the input
- * value for aggregators that return
metrics other than float.
- * @param concurrentEventAdd flag whether ot not adding of input rows
should be thread-safe
* @param preserveExistingMetrics When set to true, for any row that
already has metric
* (with the same name defined in
metricSpec), the metric aggregator in metricSpec
* is skipped and the existing metric is
unchanged. If the row does not already have
@@ -273,8 +249,6 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
*/
protected IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
- final boolean deserializeComplexMetrics,
- final boolean concurrentEventAdd,
final boolean preserveExistingMetrics,
final boolean useMaxMemoryEstimates
)
@@ -285,7 +259,6 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
this.virtualColumns = incrementalIndexSchema.getVirtualColumns();
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = new CopyOnWriteArrayList<>();
- this.deserializeComplexMetrics = deserializeComplexMetrics;
this.preserveExistingMetrics = preserveExistingMetrics;
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
this.useSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec()
@@ -303,7 +276,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
this.rollup
);
- initAggs(metrics, rowSupplier, deserializeComplexMetrics,
concurrentEventAdd);
+ initAggs(metrics, rowSupplier);
for (AggregatorFactory metric : metrics) {
MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric);
@@ -359,9 +332,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
protected abstract void initAggs(
AggregatorFactory[] metrics,
- Supplier<InputRow> rowSupplier,
- boolean deserializeComplexMetrics,
- boolean concurrentEventAdd
+ Supplier<InputRow> rowSupplier
);
// Note: This method needs to be thread safe.
@@ -740,11 +711,6 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
return numEntries.get();
}
- boolean getDeserializeComplexMetrics()
- {
- return deserializeComplexMetrics;
- }
-
AtomicInteger getNumEntries()
{
return numEntries;
@@ -1054,11 +1020,10 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
protected ColumnSelectorFactory makeColumnSelectorFactory(
final AggregatorFactory agg,
- final Supplier<InputRow> in,
- final boolean deserializeComplexMetrics
+ final Supplier<InputRow> in
)
{
- return makeColumnSelectorFactory(virtualColumns, agg, in,
deserializeComplexMetrics);
+ return makeColumnSelectorFactory(virtualColumns, agg, in);
}
protected final Comparator<IncrementalIndexRow> dimsComparator()
@@ -1127,7 +1092,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
return true;
}
- interface FactsHolder
+ public interface FactsHolder
{
/**
* @return the previous rowIndex associated with the specified key, or
@@ -1161,232 +1126,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
void clear();
}
- static class RollupFactsHolder implements FactsHolder
- {
- private final boolean sortFacts;
- // Can't use Set because we need to be able to get from collection
- private final ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow>
facts;
- private final List<DimensionDesc> dimensionDescsList;
-
- RollupFactsHolder(
- boolean sortFacts,
- Comparator<IncrementalIndexRow> incrementalIndexRowComparator,
- List<DimensionDesc> dimensionDescsList
- )
- {
- this.sortFacts = sortFacts;
- if (sortFacts) {
- this.facts = new
ConcurrentSkipListMap<>(incrementalIndexRowComparator);
- } else {
- this.facts = new ConcurrentHashMap<>();
- }
- this.dimensionDescsList = dimensionDescsList;
- }
-
- @Override
- public int getPriorIndex(IncrementalIndexRow key)
- {
- IncrementalIndexRow row = facts.get(key);
- return row == null ? IncrementalIndexRow.EMPTY_ROW_INDEX :
row.getRowIndex();
- }
-
- @Override
- public long getMinTimeMillis()
- {
- if (sortFacts) {
- return ((ConcurrentNavigableMap<IncrementalIndexRow,
IncrementalIndexRow>) facts).firstKey().getTimestamp();
- } else {
- throw new UnsupportedOperationException("can't get minTime from
unsorted facts data.");
- }
- }
-
- @Override
- public long getMaxTimeMillis()
- {
- if (sortFacts) {
- return ((ConcurrentNavigableMap<IncrementalIndexRow,
IncrementalIndexRow>) facts).lastKey().getTimestamp();
- } else {
- throw new UnsupportedOperationException("can't get maxTime from
unsorted facts data.");
- }
- }
-
- @Override
- public Iterator<IncrementalIndexRow> iterator(boolean descending)
- {
- if (descending && sortFacts) {
- return ((ConcurrentNavigableMap<IncrementalIndexRow,
IncrementalIndexRow>) facts).descendingMap()
-
.keySet()
-
.iterator();
- }
- return keySet().iterator();
- }
-
- @Override
- public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending,
long timeStart, long timeEnd)
- {
- if (!sortFacts) {
- throw new UnsupportedOperationException("can't get timeRange from
unsorted facts data.");
- }
- IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new
Object[]{}, dimensionDescsList);
- IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new
Object[]{}, dimensionDescsList);
- ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow> subMap =
- ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>)
facts).subMap(start, end);
- ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> rangeMap =
descending ? subMap.descendingMap() : subMap;
- return rangeMap.keySet();
- }
-
- @Override
- public Iterable<IncrementalIndexRow> keySet()
- {
- return facts.keySet();
- }
-
- @Override
- public Iterable<IncrementalIndexRow> persistIterable()
- {
- // with rollup, facts are already pre-sorted so just return keyset
- return keySet();
- }
-
- @Override
- public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
- {
- // setRowIndex() must be called before facts.putIfAbsent() for
visibility of rowIndex from concurrent readers.
- key.setRowIndex(rowIndex);
- IncrementalIndexRow prev = facts.putIfAbsent(key, key);
- return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX :
prev.getRowIndex();
- }
-
- @Override
- public void clear()
- {
- facts.clear();
- }
- }
-
- static class PlainFactsHolder implements FactsHolder
- {
- private final boolean sortFacts;
- private final ConcurrentMap<Long, Deque<IncrementalIndexRow>> facts;
-
- private final Comparator<IncrementalIndexRow>
incrementalIndexRowComparator;
-
- public PlainFactsHolder(boolean sortFacts, Comparator<IncrementalIndexRow>
incrementalIndexRowComparator)
- {
- this.sortFacts = sortFacts;
- if (sortFacts) {
- this.facts = new ConcurrentSkipListMap<>();
- } else {
- this.facts = new ConcurrentHashMap<>();
- }
- this.incrementalIndexRowComparator = incrementalIndexRowComparator;
- }
-
- @Override
- public int getPriorIndex(IncrementalIndexRow key)
- {
- // always return EMPTY_ROW_INDEX to indicate that no prior key cause we
always add new row
- return IncrementalIndexRow.EMPTY_ROW_INDEX;
- }
-
- @Override
- public long getMinTimeMillis()
- {
- if (sortFacts) {
- return ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>)
facts).firstKey();
- } else {
- throw new UnsupportedOperationException("can't get minTime from
unsorted facts data.");
- }
- }
-
- @Override
- public long getMaxTimeMillis()
- {
- if (sortFacts) {
- return ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>)
facts).lastKey();
- } else {
- throw new UnsupportedOperationException("can't get maxTime from
unsorted facts data.");
- }
- }
-
- @Override
- public Iterator<IncrementalIndexRow> iterator(boolean descending)
- {
- if (descending && sortFacts) {
- return timeOrderedConcat(((ConcurrentNavigableMap<Long,
Deque<IncrementalIndexRow>>) facts)
- .descendingMap().values(),
true).iterator();
- }
- return timeOrderedConcat(facts.values(), false).iterator();
- }
-
- @Override
- public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending,
long timeStart, long timeEnd)
- {
- ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap =
- ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>)
facts).subMap(timeStart, timeEnd);
- final ConcurrentMap<Long, Deque<IncrementalIndexRow>> rangeMap =
descending ? subMap.descendingMap() : subMap;
- return timeOrderedConcat(rangeMap.values(), descending);
- }
-
- private Iterable<IncrementalIndexRow> timeOrderedConcat(
- final Iterable<Deque<IncrementalIndexRow>> iterable,
- final boolean descending
- )
- {
- return () -> Iterators.concat(
- Iterators.transform(
- iterable.iterator(),
- input -> descending ? input.descendingIterator() :
input.iterator()
- )
- );
- }
-
- private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
- final Collection<Deque<IncrementalIndexRow>> rowGroups
- )
- {
- return rowGroups.stream()
- .flatMap(Collection::stream)
- .sorted(incrementalIndexRowComparator);
- }
-
- @Override
- public Iterable<IncrementalIndexRow> keySet()
- {
- return timeOrderedConcat(facts.values(), false);
- }
-
- @Override
- public Iterable<IncrementalIndexRow> persistIterable()
- {
- return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
- }
-
- @Override
- public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
- {
- Long time = key.getTimestamp();
- Deque<IncrementalIndexRow> rows = facts.get(time);
- if (rows == null) {
- facts.putIfAbsent(time, new ConcurrentLinkedDeque<>());
- // in race condition, rows may be put by other thread, so always get
latest status from facts
- rows = facts.get(time);
- }
- // setRowIndex() must be called before rows.add() for visibility of
rowIndex from concurrent readers.
- key.setRowIndex(rowIndex);
- rows.add(key);
- // always return EMPTY_ROW_INDEX to indicate that we always add new row
- return IncrementalIndexRow.EMPTY_ROW_INDEX;
- }
-
- @Override
- public void clear()
- {
- facts.clear();
- }
- }
-
- private class LongMetricColumnSelector implements LongColumnSelector
+ private final class LongMetricColumnSelector implements LongColumnSelector
{
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
@@ -1417,7 +1157,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
}
}
- private class ObjectMetricColumnSelector extends ObjectColumnSelector
+ private final class ObjectMetricColumnSelector extends ObjectColumnSelector
{
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
@@ -1454,7 +1194,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
}
}
- private class FloatMetricColumnSelector implements FloatColumnSelector
+ private final class FloatMetricColumnSelector implements FloatColumnSelector
{
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
@@ -1485,7 +1225,7 @@ public abstract class IncrementalIndex implements
Iterable<Row>, Closeable, Colu
}
}
- private class DoubleMetricColumnSelector implements DoubleColumnSelector
+ private final class DoubleMetricColumnSelector implements
DoubleColumnSelector
{
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
index 987ee5f8bf7..70a589f3950 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java
@@ -42,7 +42,7 @@ public final class IncrementalIndexRow
* rowIndex is not checked in {@link #equals} and {@link #hashCode} on
purpose. IncrementalIndexRow acts as a Map key
* and "entry" object (rowIndex is the "value") at the same time. This is
done to reduce object indirection and
* improve locality, and avoid boxing of rowIndex as Integer, when stored in
JDK collection:
- * {@link IncrementalIndex.RollupFactsHolder} needs concurrent collections,
that are not present in fastutil.
+ * {@link OnheapIncrementalIndex.RollupFactsHolder} needs concurrent
collections, that are not present in fastutil.
*/
private int rowIndex;
private long dimsKeySize;
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 8e253f8e903..3449226e4ce 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -47,14 +47,23 @@ import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
+import java.util.stream.Stream;
/**
*
@@ -72,7 +81,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION =
100;
/**
- * overhead per {@link ConcurrentHashMap.Node} or {@link
java.util.concurrent.ConcurrentSkipListMap.Node} object
+ * overhead per {@link ConcurrentSkipListMap.Node} object in facts table
*/
private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 +
Integer.BYTES;
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new
ConcurrentHashMap<>();
@@ -118,28 +127,19 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
- boolean deserializeComplexMetrics,
- boolean concurrentEventAdd,
- boolean sortFacts,
int maxRowCount,
long maxBytesInMemory,
- // preserveExistingMetrics should only be set true for DruidInputSource
since that is the only case where we can have existing metrics
- // This is currently only use by auto compaction and should not be use
for anything else.
+ // preserveExistingMetrics should only be set true for DruidInputSource
since that is the only case where we can
+ // have existing metrics. This is currently only use by auto compaction
and should not be use for anything else.
boolean preserveExistingMetrics,
boolean useMaxMemoryEstimates
)
{
- super(
- incrementalIndexSchema,
- deserializeComplexMetrics,
- concurrentEventAdd,
- preserveExistingMetrics,
- useMaxMemoryEstimates
- );
+ super(incrementalIndexSchema, preserveExistingMetrics,
useMaxMemoryEstimates);
this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE :
maxBytesInMemory;
- this.facts = incrementalIndexSchema.isRollup() ? new
RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
- : new
PlainFactsHolder(sortFacts, dimsComparator());
+ this.facts = incrementalIndexSchema.isRollup() ? new
RollupFactsHolder(dimsComparator(), getDimensions())
+ : new
PlainFactsHolder(dimsComparator());
maxBytesPerRowForAggregators =
useMaxMemoryEstimates ?
getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0;
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
@@ -190,9 +190,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
@Override
protected void initAggs(
final AggregatorFactory[] metrics,
- final Supplier<InputRow> rowSupplier,
- final boolean deserializeComplexMetrics,
- final boolean concurrentEventAdd
+ final Supplier<InputRow> rowSupplier
)
{
selectors = new HashMap<>();
@@ -200,18 +198,14 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
for (AggregatorFactory agg : metrics) {
selectors.put(
agg.getName(),
- new CachingColumnSelectorFactory(
- makeColumnSelectorFactory(agg, rowSupplier,
deserializeComplexMetrics),
- concurrentEventAdd
- )
+ new CachingColumnSelectorFactory(makeColumnSelectorFactory(agg,
rowSupplier))
);
if (preserveExistingMetrics) {
AggregatorFactory combiningAgg = agg.getCombiningFactory();
combiningAggSelectors.put(
combiningAgg.getName(),
new CachingColumnSelectorFactory(
- makeColumnSelectorFactory(combiningAgg, rowSupplier,
deserializeComplexMetrics),
- concurrentEventAdd
+ makeColumnSelectorFactory(combiningAgg, rowSupplier)
)
);
}
@@ -550,6 +544,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
* If preserveExistingMetrics flag is set, then this method will combine
values from two aggregators, the aggregator
* for aggregating from input into output field and the aggregator for
combining already aggregated field, as needed
*/
+ @Nullable
private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[]
aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
{
if (preserveExistingMetrics) {
@@ -605,18 +600,13 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
*/
static class CachingColumnSelectorFactory implements ColumnSelectorFactory
{
- private final Map<String, ColumnValueSelector<?>> columnSelectorMap;
+ private final HashMap<String, ColumnValueSelector<?>> columnSelectorMap;
private final ColumnSelectorFactory delegate;
- public CachingColumnSelectorFactory(ColumnSelectorFactory delegate,
boolean concurrentEventAdd)
+ public CachingColumnSelectorFactory(ColumnSelectorFactory delegate)
{
this.delegate = delegate;
-
- if (concurrentEventAdd) {
- columnSelectorMap = new ConcurrentHashMap<>();
- } else {
- columnSelectorMap = new HashMap<>();
- }
+ this.columnSelectorMap = new HashMap<>();
}
@Override
@@ -628,7 +618,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
@Override
public ColumnValueSelector<?> makeColumnValueSelector(String columnName)
{
- ColumnValueSelector existing = columnSelectorMap.get(columnName);
+ ColumnValueSelector<?> existing = columnSelectorMap.get(columnName);
if (existing != null) {
return existing;
}
@@ -656,9 +646,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
{
return new OnheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema
is null"),
- deserializeComplexMetrics,
- concurrentEventAdd,
- sortFacts,
maxRowCount,
maxBytesInMemory,
preserveExistingMetrics,
@@ -734,4 +721,194 @@ public class OnheapIncrementalIndex extends
IncrementalIndex
return Objects.hash(preserveExistingMetrics);
}
}
+
+ static final class RollupFactsHolder implements FactsHolder
+ {
+ // Can't use Set because we need to be able to get from collection
+ private final ConcurrentNavigableMap<IncrementalIndexRow,
IncrementalIndexRow> facts;
+ private final List<DimensionDesc> dimensionDescsList;
+
+ RollupFactsHolder(
+ Comparator<IncrementalIndexRow> incrementalIndexRowComparator,
+ List<DimensionDesc> dimensionDescsList
+ )
+ {
+ this.facts = new ConcurrentSkipListMap<>(incrementalIndexRowComparator);
+ this.dimensionDescsList = dimensionDescsList;
+ }
+
+ @Override
+ public int getPriorIndex(IncrementalIndexRow key)
+ {
+ IncrementalIndexRow row = facts.get(key);
+ return row == null ? IncrementalIndexRow.EMPTY_ROW_INDEX :
row.getRowIndex();
+ }
+
+ @Override
+ public long getMinTimeMillis()
+ {
+ return facts.firstKey().getTimestamp();
+ }
+
+ @Override
+ public long getMaxTimeMillis()
+ {
+ return facts.lastKey().getTimestamp();
+ }
+
+ @Override
+ public Iterator<IncrementalIndexRow> iterator(boolean descending)
+ {
+ if (descending) {
+ return facts.descendingMap()
+ .keySet()
+ .iterator();
+ }
+ return keySet().iterator();
+ }
+
+ @Override
+ public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending,
long timeStart, long timeEnd)
+ {
+ IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new
Object[]{}, dimensionDescsList);
+ IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new
Object[]{}, dimensionDescsList);
+ ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow> subMap
= facts.subMap(start, end);
+ ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> rangeMap =
descending ? subMap.descendingMap() : subMap;
+ return rangeMap.keySet();
+ }
+
+ @Override
+ public Iterable<IncrementalIndexRow> keySet()
+ {
+ return facts.keySet();
+ }
+
+ @Override
+ public Iterable<IncrementalIndexRow> persistIterable()
+ {
+ // with rollup, facts are already pre-sorted so just return keyset
+ return keySet();
+ }
+
+ @Override
+ public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
+ {
+ // setRowIndex() must be called before facts.putIfAbsent() for
visibility of rowIndex from concurrent readers.
+ key.setRowIndex(rowIndex);
+ IncrementalIndexRow prev = facts.putIfAbsent(key, key);
+ return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX :
prev.getRowIndex();
+ }
+
+ @Override
+ public void clear()
+ {
+ facts.clear();
+ }
+ }
+
+ static final class PlainFactsHolder implements FactsHolder
+ {
+ private final ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>
facts;
+
+ private final Comparator<IncrementalIndexRow>
incrementalIndexRowComparator;
+
+ public PlainFactsHolder(Comparator<IncrementalIndexRow>
incrementalIndexRowComparator)
+ {
+ this.facts = new ConcurrentSkipListMap<>();
+ this.incrementalIndexRowComparator = incrementalIndexRowComparator;
+ }
+
+ @Override
+ public int getPriorIndex(IncrementalIndexRow key)
+ {
+ // always return EMPTY_ROW_INDEX to indicate that no prior key cause we
always add new row
+ return IncrementalIndexRow.EMPTY_ROW_INDEX;
+ }
+
+ @Override
+ public long getMinTimeMillis()
+ {
+ return facts.firstKey();
+ }
+
+ @Override
+ public long getMaxTimeMillis()
+ {
+ return facts.lastKey();
+ }
+
+ @Override
+ public Iterator<IncrementalIndexRow> iterator(boolean descending)
+ {
+ if (descending) {
+ return timeOrderedConcat(facts.descendingMap().values(),
true).iterator();
+ }
+ return timeOrderedConcat(facts.values(), false).iterator();
+ }
+
+ @Override
+ public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending,
long timeStart, long timeEnd)
+ {
+ ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap =
facts.subMap(timeStart, timeEnd);
+ final ConcurrentMap<Long, Deque<IncrementalIndexRow>> rangeMap =
descending ? subMap.descendingMap() : subMap;
+ return timeOrderedConcat(rangeMap.values(), descending);
+ }
+
+ private Iterable<IncrementalIndexRow> timeOrderedConcat(
+ final Iterable<Deque<IncrementalIndexRow>> iterable,
+ final boolean descending
+ )
+ {
+ return () -> Iterators.concat(
+ Iterators.transform(
+ iterable.iterator(),
+ input -> descending ? input.descendingIterator() :
input.iterator()
+ )
+ );
+ }
+
+ private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
+ final Collection<Deque<IncrementalIndexRow>> rowGroups
+ )
+ {
+ return rowGroups.stream()
+ .flatMap(Collection::stream)
+ .sorted(incrementalIndexRowComparator);
+ }
+
+ @Override
+ public Iterable<IncrementalIndexRow> keySet()
+ {
+ return timeOrderedConcat(facts.values(), false);
+ }
+
+ @Override
+ public Iterable<IncrementalIndexRow> persistIterable()
+ {
+ return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
+ }
+
+ @Override
+ public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
+ {
+ Long time = key.getTimestamp();
+ Deque<IncrementalIndexRow> rows = facts.get(time);
+ if (rows == null) {
+ facts.putIfAbsent(time, new ConcurrentLinkedDeque<>());
+ // in race condition, rows may be put by other thread, so always get
latest status from facts
+ rows = facts.get(time);
+ }
+ // setRowIndex() must be called before rows.add() for visibility of
rowIndex from concurrent readers.
+ key.setRowIndex(rowIndex);
+ rows.add(key);
+ // always return EMPTY_ROW_INDEX to indicate that we always add new row
+ return IncrementalIndexRow.EMPTY_ROW_INDEX;
+ }
+
+ @Override
+ public void clear()
+ {
+ facts.clear();
+ }
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index c60520fbfa4..fab9357b051 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -480,7 +480,6 @@ public class AggregationTestHelper implements Closeable
outDir,
minTimestamp,
gran,
- true,
maxRowCount,
rollup
);
@@ -498,7 +497,6 @@ public class AggregationTestHelper implements Closeable
File outDir,
long minTimestamp,
Granularity gran,
- boolean deserializeComplexMetrics,
int maxRowCount,
boolean rollup
) throws Exception
@@ -517,7 +515,6 @@ public class AggregationTestHelper implements Closeable
.withRollup(rollup)
.build()
)
- .setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount)
.build();
@@ -538,7 +535,6 @@ public class AggregationTestHelper implements Closeable
.withRollup(rollup)
.build()
)
- .setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount)
.build();
}
@@ -594,7 +590,6 @@ public class AggregationTestHelper implements Closeable
final AggregatorFactory[] metrics,
long minTimestamp,
Granularity gran,
- boolean deserializeComplexMetrics,
int maxRowCount,
boolean rollup
) throws Exception
@@ -609,7 +604,6 @@ public class AggregationTestHelper implements Closeable
.withRollup(rollup)
.build()
)
- .setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount)
.build();
@@ -636,7 +630,6 @@ public class AggregationTestHelper implements Closeable
final AggregatorFactory[] metrics,
long minTimestamp,
Granularity gran,
- boolean deserializeComplexMetrics,
int maxRowCount,
boolean rollup
) throws Exception
@@ -648,7 +641,6 @@ public class AggregationTestHelper implements Closeable
metrics,
minTimestamp,
gran,
- deserializeComplexMetrics,
maxRowCount,
rollup
);
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
index 2e516cebf63..b54a5e5db3b 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java
@@ -107,7 +107,6 @@ public class StringColumnAggregationTest
new AggregatorFactory[]{new CountAggregatorFactory("count")},
0,
Granularities.NONE,
- false,
100,
false
);
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
index 786a6fc176c..1e9bca48216 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java
@@ -101,7 +101,6 @@ public class SimpleTestIndex
},
0,
Granularities.NONE,
- false,
100,
false
);
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index 391a490e2da..2dfe2b7dfbf 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -138,7 +138,6 @@ public class GroupByLimitPushDownInsufficientBufferTest
extends InitializedNullH
.withRollup(withRollup)
.build()
)
- .setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.build();
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 8cd451eb009..28ff970efdf 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -154,7 +154,6 @@ public class GroupByLimitPushDownMultiNodeMergeTest
.withRollup(withRollup)
.build()
)
- .setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.build();
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index 05aa06db0a5..0284edd4127 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -130,7 +130,6 @@ public class GroupByMultiSegmentTest
.withRollup(withRollup)
.build()
)
- .setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.build();
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
index 3892a2018a0..266691ee294 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
@@ -138,7 +138,6 @@ public class GroupByQueryRunnerFactoryTest
{
IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
- .setConcurrentEventAdd(true)
.setMaxRowCount(5000)
.build();
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
index 8bf2785e2b7..06802b55694 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
@@ -132,7 +132,6 @@ public class NestedQueryPushDownTest extends
InitializedNullHandlingTest
))
.build()
)
- .setConcurrentEventAdd(true)
.setMaxRowCount(1000)
.build();
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
index 4f7eb9b8317..d7b51ab0f8b 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
@@ -180,7 +180,7 @@ public class IncrementalIndexCreator implements Closeable
*
* For example, for a parameterized test with the following constrctor:
* {@code
- * public IncrementalIndexTest(String indexType, String mode, boolean
deserializeComplexMetrics)
+ * public IncrementalIndexTest(String indexType, String mode)
* {
* ...
* }
@@ -188,12 +188,11 @@ public class IncrementalIndexCreator implements Closeable
*
* we can test all the input combinations as follows:
* {@code
- * @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
+ * @Parameterized.Parameters(name = "{index}: {0}, {1}")
* public static Collection<?> constructorFeeder()
* {
* return IncrementalIndexCreator.indexTypeCartesianProduct(
- * ImmutableList.of("rollup", "plain"),
- * ImmutableList.of(true, false)
+ * ImmutableList.of("rollup", "plain")
* );
* }
* }
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
index 0b45026e829..c83c4f0da4c 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
@@ -67,8 +67,7 @@ public class IncrementalIndexTest extends
InitializedNullHandlingTest
public IncrementalIndexTest(
String indexType,
- String mode,
- boolean deserializeComplexMetrics
+ String mode
) throws JsonProcessingException
{
this.mode = mode;
@@ -101,18 +100,16 @@ public class IncrementalIndexTest extends
InitializedNullHandlingTest
.build();
indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType,
(builder, args) -> builder
.setIndexSchema(schema)
- .setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(1_000_000)
.build())
);
}
- @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
+ @Parameterized.Parameters(name = "{index}: {0}, {1}")
public static Collection<?> constructorFeeder()
{
return IncrementalIndexCreator.indexTypeCartesianProduct(
- ImmutableList.of("rollup", "plain"),
- ImmutableList.of(true, false)
+ ImmutableList.of("rollup", "plain")
);
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index 02d43e27506..3d0674d2845 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -110,18 +110,12 @@ public class OnheapIncrementalIndexBenchmark extends
AbstractBenchmark
public MapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
- boolean deserializeComplexMetrics,
- boolean concurrentEventAdd,
- boolean sortFacts,
int maxRowCount,
long maxBytesInMemory
)
{
super(
incrementalIndexSchema,
- deserializeComplexMetrics,
- concurrentEventAdd,
- sortFacts,
maxRowCount,
maxBytesInMemory,
false,
@@ -143,9 +137,6 @@ public class OnheapIncrementalIndexBenchmark extends
AbstractBenchmark
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
- true,
- false,
- true,
maxRowCount,
maxBytesInMemory,
false,
@@ -190,7 +181,7 @@ public class OnheapIncrementalIndexBenchmark extends
AbstractBenchmark
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
- makeColumnSelectorFactory(agg, rowSupplier,
getDeserializeComplexMetrics())
+ makeColumnSelectorFactory(agg, rowSupplier)
);
}
Integer rowIndex;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]