This is an automated email from the ASF dual-hosted git repository.
leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 609da01 Fix dictionary ID race condition in
IncrementalIndexStorageAdapter (#6340)
609da01 is described below
commit 609da018827697b7ae5ce6cc464762041e9a865e
Author: Jonathan Wei <[email protected]>
AuthorDate: Mon Sep 17 23:43:29 2018 -0700
Fix dictionary ID race condition in IncrementalIndexStorageAdapter (#6340)
Possibly related to https://github.com/apache/incubator-druid/issues/4937
--------
There is currently a race condition in IncrementalIndexStorageAdapter that
can lead to exceptions like the following, when running queries with filters on
String dimensions that hit realtime tasks:
```
org.apache.druid.java.util.common.ISE: id[5] >= maxId[5]
at
org.apache.druid.segment.StringDimensionIndexer$1IndexerDimensionSelector.lookupName(StringDimensionIndexer.java:591)
at
org.apache.druid.segment.StringDimensionIndexer$1IndexerDimensionSelector$2.matches(StringDimensionIndexer.java:562)
at
org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter$IncrementalIndexCursor.advance(IncrementalIndexStorageAdapter.java:284)
```
When the `filterMatcher` is created in the constructor of
`IncrementalIndexStorageAdapter.IncrementalIndexCursor`,
`StringDimensionIndexer.makeDimensionSelector` gets called eventually, which
calls:
```
final int maxId = getCardinality();
...
@Override
public int getCardinality()
{
return dimLookup.size();
}
```
So `maxId` is set to the size of the dictionary at the time that the
`filterMatcher` is created.
However, the `maxRowIndex` which is meant to prevent the Cursor from
returning rows that were added after the Cursor was created (see
https://github.com/apache/incubator-druid/pull/4049) is set after the
`filterMatcher` is created.
If rows with new dictionary values are added after the `filterMatcher` is
created but before `maxRowIndex` is set, then it is possible for the Cursor to
return rows that contain the new values, which will have `id >= maxId`.
This PR sets `maxRowIndex` before creating the `filterMatcher` to prevent
rows with unknown dictionary IDs from being passed to the `filterMatcher`.
-----------
The included test triggers the error with a custom Filter +
DruidPredicateFactory.
The DimensionSelector for predicate-based filter matching is created here
in `Filters.makeValueMatcher`:
```
public static ValueMatcher makeValueMatcher(
final ColumnSelectorFactory columnSelectorFactory,
final String columnName,
final DruidPredicateFactory predicateFactory
)
{
final ColumnCapabilities capabilities =
columnSelectorFactory.getColumnCapabilities(columnName);
// This should be folded into the ValueMatcherColumnSelectorStrategy
once that can handle LONG typed columns.
if (capabilities != null && capabilities.getType() == ValueType.LONG) {
return getLongPredicateMatcher(
columnSelectorFactory.makeColumnValueSelector(columnName),
predicateFactory.makeLongPredicate()
);
}
final ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy> selector =
DimensionHandlerUtils.createColumnSelectorPlus(
ValueMatcherColumnSelectorStrategyFactory.instance(),
DefaultDimensionSpec.of(columnName),
columnSelectorFactory
);
return
selector.getColumnSelectorStrategy().makeValueMatcher(selector.getSelector(),
predicateFactory);
}
```
The test Filter adds a row to the IncrementalIndex in the test when the
predicateFactory creates a new String predicate, after
`DimensionHandlerUtils.createColumnSelectorPlus` is called.
---
.../IncrementalIndexStorageAdapter.java | 3 +-
.../IncrementalIndexStorageAdapterTest.java | 159 +++++++++++++++++++++
2 files changed, 161 insertions(+), 1 deletion(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
index 53cba0f..94c3cf2 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
@@ -236,9 +236,10 @@ public class IncrementalIndexStorageAdapter implements
StorageAdapter
{
currEntry = new IncrementalIndexRowHolder();
columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(index,
virtualColumns, descending, currEntry);
+ // Set maxRowIndex before creating the filterMatcher. See
https://github.com/apache/incubator-druid/pull/6340
+ maxRowIndex = index.getLastRowIndex();
filterMatcher = filter == null ? BooleanValueMatcher.of(true) :
filter.makeMatcher(columnSelectorFactory);
numAdvanced = -1;
- maxRowIndex = index.getLastRowIndex();
final long timeStart = Math.max(interval.getStartMillis(),
actualInterval.getStartMillis());
cursorIterable = index.getFacts().timeRangeIterable(
descending,
diff --git
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
index 00e7184..de7a6fc 100644
---
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.segment.incremental;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -33,23 +35,34 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.js.JavaScriptConfig;
+import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.BitmapIndexSelector;
import org.apache.druid.query.filter.DimFilters;
+import org.apache.druid.query.filter.DruidDoublePredicate;
+import org.apache.druid.query.filter.DruidFloatPredicate;
+import org.apache.druid.query.filter.DruidLongPredicate;
+import org.apache.druid.query.filter.DruidPredicateFactory;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryEngine;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.query.topn.TopNQueryEngine;
import org.apache.druid.query.topn.TopNResultValue;
+import org.apache.druid.segment.ColumnSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.SelectorFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -479,6 +492,59 @@ public class IncrementalIndexStorageAdapterTest
}
@Test
+ public void testCursorDictionaryRaceConditionFix() throws Exception
+ {
+ // Tests the dictionary ID race condition bug described at
https://github.com/apache/incubator-druid/pull/6340
+
+ final IncrementalIndex index = indexCreator.createIndex();
+ final long timestamp = System.currentTimeMillis();
+
+ for (int i = 0; i < 5; i++) {
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Collections.singletonList("billy"),
+ ImmutableMap.of("billy", "v1" + i)
+ )
+ );
+ }
+
+ final StorageAdapter sa = new IncrementalIndexStorageAdapter(index);
+
+ Sequence<Cursor> cursors = sa.makeCursors(
+ new DictionaryRaceTestFilter(index, timestamp),
+ Intervals.utc(timestamp - 60_000, timestamp + 60_000),
+ VirtualColumns.EMPTY,
+ Granularities.ALL,
+ false,
+ null
+ );
+ final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0);
+
+ cursors
+ .map(cursor -> {
+ DimensionSelector dimSelector = cursor
+ .getColumnSelectorFactory()
+ .makeDimensionSelector(new DefaultDimensionSpec("billy",
"billy"));
+ int cardinality = dimSelector.getValueCardinality();
+
+ int rowNumInCursor = 0;
+ while (!cursor.isDone()) {
+ IndexedInts row = dimSelector.getRow();
+ row.forEach(i -> Assert.assertTrue(i < cardinality));
+ cursor.advance();
+ rowNumInCursor++;
+ }
+ Assert.assertEquals(5, rowNumInCursor);
+ assertCursorsNotEmpty.incrementAndGet();
+
+ return null;
+ })
+ .toList();
+ Assert.assertEquals(1, assertCursorsNotEmpty.get());
+ }
+
+ @Test
public void testCursoringAndSnapshot() throws Exception
{
final IncrementalIndex index = indexCreator.createIndex();
@@ -584,4 +650,97 @@ public class IncrementalIndexStorageAdapterTest
.toList();
Assert.assertEquals(1, assertCursorsNotEmpty.get());
}
+
+ private class DictionaryRaceTestFilter implements Filter
+ {
+ private final IncrementalIndex index;
+ private final long timestamp;
+
+ private DictionaryRaceTestFilter(
+ IncrementalIndex index,
+ long timestamp
+ )
+ {
+ this.index = index;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public <T> T getBitmapResult(
+ BitmapIndexSelector selector, BitmapResultFactory<T>
bitmapResultFactory
+ )
+ {
+ return bitmapResultFactory.wrapAllTrue(Filters.allTrue(selector));
+ }
+
+ @Override
+ public double estimateSelectivity(BitmapIndexSelector indexSelector)
+ {
+ return 1;
+ }
+
+ @Override
+ public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
+ {
+ return Filters.makeValueMatcher(
+ factory,
+ "billy",
+ new DictionaryRaceTestFilterDruidPredicateFactory()
+ );
+ }
+
+ @Override
+ public boolean supportsBitmapIndex(BitmapIndexSelector selector)
+ {
+ return true;
+ }
+
+ @Override
+ public boolean supportsSelectivityEstimation(
+ ColumnSelector columnSelector, BitmapIndexSelector indexSelector
+ )
+ {
+ return true;
+ }
+
+ private class DictionaryRaceTestFilterDruidPredicateFactory implements
DruidPredicateFactory
+ {
+ @Override
+ public Predicate<String> makeStringPredicate()
+ {
+ try {
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Collections.singletonList("billy"),
+ ImmutableMap.of("billy", "v31234")
+ )
+ );
+ }
+ catch (IndexSizeExceededException isee) {
+ throw new RuntimeException(isee);
+ }
+
+ return Predicates.alwaysTrue();
+ }
+
+ @Override
+ public DruidLongPredicate makeLongPredicate()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DruidFloatPredicate makeFloatPredicate()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DruidDoublePredicate makeDoublePredicate()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]