This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 609da01  Fix dictionary ID race condition in 
IncrementalIndexStorageAdapter (#6340)
609da01 is described below

commit 609da018827697b7ae5ce6cc464762041e9a865e
Author: Jonathan Wei <[email protected]>
AuthorDate: Mon Sep 17 23:43:29 2018 -0700

    Fix dictionary ID race condition in IncrementalIndexStorageAdapter (#6340)
    
    Possibly related to https://github.com/apache/incubator-druid/issues/4937
    
    --------
    
    There is currently a race condition in IncrementalIndexStorageAdapter that 
can lead to exceptions like the following, when running queries with filters on 
String dimensions that hit realtime tasks:
    
    ```
    org.apache.druid.java.util.common.ISE: id[5] >= maxId[5]
        at 
org.apache.druid.segment.StringDimensionIndexer$1IndexerDimensionSelector.lookupName(StringDimensionIndexer.java:591)
        at 
org.apache.druid.segment.StringDimensionIndexer$1IndexerDimensionSelector$2.matches(StringDimensionIndexer.java:562)
        at 
org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter$IncrementalIndexCursor.advance(IncrementalIndexStorageAdapter.java:284)
    ```
    
    When the `filterMatcher` is created in the constructor of 
`IncrementalIndexStorageAdapter.IncrementalIndexCursor`, 
`StringDimensionIndexer.makeDimensionSelector` gets called eventually, which 
calls:
    
    ```
    final int maxId = getCardinality();
    ...
    
     @Override
      public int getCardinality()
      {
        return dimLookup.size();
      }
    ```
    
    So `maxId` is set to the size of the dictionary at the time that the 
`filterMatcher` is created.
    
    However, the `maxRowIndex` which is meant to prevent the Cursor from 
returning rows that were added after the Cursor was created (see 
https://github.com/apache/incubator-druid/pull/4049) is set after the 
`filterMatcher` is created.
    
    If rows with new dictionary values are added after the `filterMatcher` is 
created but before `maxRowIndex` is set, then it is possible for the Cursor to 
return rows that contain the new values, which will have `id >= maxId`.
    
    This PR sets `maxRowIndex` before creating the `filterMatcher` to prevent 
rows with unknown dictionary IDs from being passed to the `filterMatcher`.
    
    -----------
    
    The included test triggers the error with a custom Filter + 
DruidPredicateFactory.
    
    The DimensionSelector for predicate-based filter matching is created here 
in `Filters.makeValueMatcher`:
    
    ```
      public static ValueMatcher makeValueMatcher(
          final ColumnSelectorFactory columnSelectorFactory,
          final String columnName,
          final DruidPredicateFactory predicateFactory
      )
      {
        final ColumnCapabilities capabilities = 
columnSelectorFactory.getColumnCapabilities(columnName);
    
        // This should be folded into the ValueMatcherColumnSelectorStrategy 
once that can handle LONG typed columns.
        if (capabilities != null && capabilities.getType() == ValueType.LONG) {
          return getLongPredicateMatcher(
              columnSelectorFactory.makeColumnValueSelector(columnName),
              predicateFactory.makeLongPredicate()
          );
        }
    
        final ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy> selector =
            DimensionHandlerUtils.createColumnSelectorPlus(
                ValueMatcherColumnSelectorStrategyFactory.instance(),
                DefaultDimensionSpec.of(columnName),
                columnSelectorFactory
            );
    
        return 
selector.getColumnSelectorStrategy().makeValueMatcher(selector.getSelector(), 
predicateFactory);
      }
    ```
    
    The test Filter adds a row to the IncrementalIndex in the test when the 
predicateFactory creates a new String predicate, after 
`DimensionHandlerUtils.createColumnSelectorPlus` is called.
---
 .../IncrementalIndexStorageAdapter.java            |   3 +-
 .../IncrementalIndexStorageAdapterTest.java        | 159 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 1 deletion(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
index 53cba0f..94c3cf2 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java
@@ -236,9 +236,10 @@ public class IncrementalIndexStorageAdapter implements 
StorageAdapter
     {
       currEntry = new IncrementalIndexRowHolder();
       columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(index, 
virtualColumns, descending, currEntry);
+      // Set maxRowIndex before creating the filterMatcher. See 
https://github.com/apache/incubator-druid/pull/6340
+      maxRowIndex = index.getLastRowIndex();
       filterMatcher = filter == null ? BooleanValueMatcher.of(true) : 
filter.makeMatcher(columnSelectorFactory);
       numAdvanced = -1;
-      maxRowIndex = index.getLastRowIndex();
       final long timeStart = Math.max(interval.getStartMillis(), 
actualInterval.getStartMillis());
       cursorIterable = index.getFacts().timeRangeIterable(
           descending,
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
index 00e7184..de7a6fc 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -33,23 +35,34 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.js.JavaScriptConfig;
+import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.query.Result;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.BitmapIndexSelector;
 import org.apache.druid.query.filter.DimFilters;
+import org.apache.druid.query.filter.DruidDoublePredicate;
+import org.apache.druid.query.filter.DruidFloatPredicate;
+import org.apache.druid.query.filter.DruidLongPredicate;
+import org.apache.druid.query.filter.DruidPredicateFactory;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.ValueMatcher;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryEngine;
 import org.apache.druid.query.topn.TopNQueryBuilder;
 import org.apache.druid.query.topn.TopNQueryEngine;
 import org.apache.druid.query.topn.TopNResultValue;
+import org.apache.druid.segment.ColumnSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.filter.Filters;
 import org.apache.druid.segment.filter.SelectorFilter;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -479,6 +492,59 @@ public class IncrementalIndexStorageAdapterTest
   }
 
   @Test
+  public void testCursorDictionaryRaceConditionFix() throws Exception
+  {
+    // Tests the dictionary ID race condition bug described at 
https://github.com/apache/incubator-druid/pull/6340
+
+    final IncrementalIndex index = indexCreator.createIndex();
+    final long timestamp = System.currentTimeMillis();
+
+    for (int i = 0; i < 5; i++) {
+      index.add(
+          new MapBasedInputRow(
+              timestamp,
+              Collections.singletonList("billy"),
+              ImmutableMap.of("billy", "v1" + i)
+          )
+      );
+    }
+
+    final StorageAdapter sa = new IncrementalIndexStorageAdapter(index);
+
+    Sequence<Cursor> cursors = sa.makeCursors(
+        new DictionaryRaceTestFilter(index, timestamp),
+        Intervals.utc(timestamp - 60_000, timestamp + 60_000),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+    final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0);
+
+    cursors
+        .map(cursor -> {
+          DimensionSelector dimSelector = cursor
+              .getColumnSelectorFactory()
+              .makeDimensionSelector(new DefaultDimensionSpec("billy", 
"billy"));
+          int cardinality = dimSelector.getValueCardinality();
+
+          int rowNumInCursor = 0;
+          while (!cursor.isDone()) {
+            IndexedInts row = dimSelector.getRow();
+            row.forEach(i -> Assert.assertTrue(i < cardinality));
+            cursor.advance();
+            rowNumInCursor++;
+          }
+          Assert.assertEquals(5, rowNumInCursor);
+          assertCursorsNotEmpty.incrementAndGet();
+
+          return null;
+        })
+        .toList();
+    Assert.assertEquals(1, assertCursorsNotEmpty.get());
+  }
+
+  @Test
   public void testCursoringAndSnapshot() throws Exception
   {
     final IncrementalIndex index = indexCreator.createIndex();
@@ -584,4 +650,97 @@ public class IncrementalIndexStorageAdapterTest
         .toList();
     Assert.assertEquals(1, assertCursorsNotEmpty.get());
   }
+
+  private class DictionaryRaceTestFilter implements Filter
+  {
+    private final IncrementalIndex index;
+    private final long timestamp;
+
+    private DictionaryRaceTestFilter(
+        IncrementalIndex index,
+        long timestamp
+    )
+    {
+      this.index = index;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public <T> T getBitmapResult(
+        BitmapIndexSelector selector, BitmapResultFactory<T> 
bitmapResultFactory
+    )
+    {
+      return bitmapResultFactory.wrapAllTrue(Filters.allTrue(selector));
+    }
+
+    @Override
+    public double estimateSelectivity(BitmapIndexSelector indexSelector)
+    {
+      return 1;
+    }
+
+    @Override
+    public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
+    {
+      return Filters.makeValueMatcher(
+          factory,
+          "billy",
+          new DictionaryRaceTestFilterDruidPredicateFactory()
+      );
+    }
+
+    @Override
+    public boolean supportsBitmapIndex(BitmapIndexSelector selector)
+    {
+      return true;
+    }
+
+    @Override
+    public boolean supportsSelectivityEstimation(
+        ColumnSelector columnSelector, BitmapIndexSelector indexSelector
+    )
+    {
+      return true;
+    }
+
+    private class DictionaryRaceTestFilterDruidPredicateFactory implements 
DruidPredicateFactory
+    {
+      @Override
+      public Predicate<String> makeStringPredicate()
+      {
+        try {
+          index.add(
+              new MapBasedInputRow(
+                  timestamp,
+                  Collections.singletonList("billy"),
+                  ImmutableMap.of("billy", "v31234")
+              )
+          );
+        }
+        catch (IndexSizeExceededException isee) {
+          throw new RuntimeException(isee);
+        }
+
+        return Predicates.alwaysTrue();
+      }
+
+      @Override
+      public DruidLongPredicate makeLongPredicate()
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public DruidFloatPredicate makeFloatPredicate()
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public DruidDoublePredicate makeDoublePredicate()
+      {
+        throw new UnsupportedOperationException();
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to