This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7be21c8  Faster bitmap scans (#7530)
7be21c8 is described below

commit 7be21c80b352a02c58807b3c95d535f5fae394cc
Author: Richard Startin <[email protected]>
AuthorDate: Thu Oct 7 00:53:07 2021 +0100

    Faster bitmap scans (#7530)
    
    Uses better APIs from RoaringBitmap to speed up scans by around 25%
---
 .../pinot/core/common/BlockDocIdIterator.java      |   6 +
 .../dociditerators/MVScanDocIdIterator.java        |  29 ++-
 .../dociditerators/SVScanDocIdIterator.java        |  27 ++-
 .../org/apache/pinot/perf/BenchmarkRangeIndex.java |  65 ------
 .../pinot/perf/BenchmarkScanDocIdIterators.java    | 246 +++++++++++++++++++++
 .../java/org/apache/pinot/perf/Distribution.java   |  89 ++++++++
 6 files changed, 378 insertions(+), 84 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockDocIdIterator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockDocIdIterator.java
index f13e96e..2dc9bd1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockDocIdIterator.java
@@ -41,4 +41,10 @@ public interface BlockDocIdIterator {
    * <p>NOTE: There should be no more calls to this method after it returns 
{@link Constants#EOF}.
    */
   int advance(int targetDocId);
+
+  /**
+   * Empirically determined to be the best batch size for batch iterators.
+   * @see 
{https://github.com/RoaringBitmap/RoaringBitmap/pull/243#issuecomment-381278304}
+   */
+  int OPTIMAL_ITERATOR_BATCH_SIZE = 256;
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
index a2a605e..aa820e7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
@@ -22,7 +22,8 @@ import 
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.BatchIterator;
+import org.roaringbitmap.RoaringBitmapWriter;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
@@ -73,17 +74,25 @@ public final class MVScanDocIdIterator implements 
ScanBasedDocIdIterator {
 
   @Override
   public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
-    MutableRoaringBitmap result = new MutableRoaringBitmap();
-    IntIterator docIdIterator = docIds.getIntIterator();
-    int nextDocId;
-    while (docIdIterator.hasNext() && (nextDocId = docIdIterator.next()) < 
_numDocs) {
-      int length = _reader.getDictIdMV(nextDocId, _dictIdBuffer, 
_readerContext);
-      _numEntriesScanned += length;
-      if (_predicateEvaluator.applyMV(_dictIdBuffer, length)) {
-        result.add(nextDocId);
+    if (docIds.isEmpty()) {
+      return new MutableRoaringBitmap();
+    }
+    RoaringBitmapWriter<MutableRoaringBitmap> result = 
RoaringBitmapWriter.bufferWriter()
+        .expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
+    BatchIterator docIdIterator = docIds.getBatchIterator();
+    int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+    while (docIdIterator.hasNext()) {
+      int limit = docIdIterator.nextBatch(buffer);
+      for (int i = 0; i < limit; i++) {
+        int nextDocId = buffer[i];
+        int length = _reader.getDictIdMV(nextDocId, _dictIdBuffer, 
_readerContext);
+        _numEntriesScanned += length;
+        if (_predicateEvaluator.applyMV(_dictIdBuffer, length)) {
+          result.add(nextDocId);
+        }
       }
     }
-    return result;
+    return result.get();
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
index 986e08e..f327abe 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
@@ -22,7 +22,8 @@ import 
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.BatchIterator;
+import org.roaringbitmap.RoaringBitmapWriter;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
@@ -72,16 +73,24 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
 
   @Override
   public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
-    MutableRoaringBitmap result = new MutableRoaringBitmap();
-    IntIterator docIdIterator = docIds.getIntIterator();
-    int nextDocId;
-    while (docIdIterator.hasNext() && (nextDocId = docIdIterator.next()) < 
_numDocs) {
-      _numEntriesScanned++;
-      if (_valueMatcher.doesValueMatch(nextDocId)) {
-        result.add(nextDocId);
+    if (docIds.isEmpty()) {
+      return new MutableRoaringBitmap();
+    }
+    RoaringBitmapWriter<MutableRoaringBitmap> result = 
RoaringBitmapWriter.bufferWriter()
+        .expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
+    BatchIterator docIdIterator = docIds.getBatchIterator();
+    int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+    while (docIdIterator.hasNext()) {
+      int limit = docIdIterator.nextBatch(buffer);
+      for (int i = 0; i < limit; i++) {
+        int nextDocId = buffer[i];
+        _numEntriesScanned++;
+        if (_valueMatcher.doesValueMatch(nextDocId)) {
+          result.add(nextDocId);
+        }
       }
     }
-    return result;
+    return result.get();
   }
 
   @Override
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
index 25079ca..60db2a3 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
@@ -21,13 +21,10 @@ package org.apache.pinot.perf;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Random;
-import java.util.SplittableRandom;
 import java.util.function.DoubleSupplier;
 import java.util.function.LongSupplier;
 import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
 import 
org.apache.pinot.segment.local.segment.index.readers.BitSlicedRangeIndexReader;
@@ -59,68 +56,6 @@ public class BenchmarkRangeIndex {
 
   private static final String COLUMN_NAME = "col";
 
-  public enum Distribution {
-    NORMAL {
-      @Override
-      public DoubleSupplier createDouble(long seed, double... params) {
-        Random random = new Random(seed);
-        return () -> random.nextGaussian() * params[1] + params[0];
-      }
-    },
-    UNIFORM {
-      @Override
-      public DoubleSupplier createDouble(long seed, double... params) {
-        Random random = new Random(seed);
-        return () -> (params[1] - params[0]) * random.nextDouble() + params[0];
-      }
-    },
-    EXP {
-      @Override
-      public DoubleSupplier createDouble(long seed, double... params) {
-        Random random = new Random(seed);
-        return () -> -(Math.log(random.nextDouble()) / params[0]);
-      }
-    },
-    POWER {
-      @Override
-      public DoubleSupplier createDouble(long seed, double... params) {
-        long min = (long) params[0];
-        long max = (long) params[1];
-        double alpha = params[2];
-        SplittableRandom random = new SplittableRandom(seed);
-        return () -> (Math.pow((Math.pow(max, alpha + 1)
-            - Math.pow(min, alpha + 1) * (random.nextDouble() + 1)), 1D / 
(alpha + 1)));
-      }
-    };
-
-    public LongSupplier createLong(long seed, double... params) {
-      DoubleSupplier source = createDouble(seed, params);
-      return () -> (long) source.getAsDouble();
-    }
-
-    public abstract DoubleSupplier createDouble(long seed, double... params);
-
-    public static LongSupplier createLongSupplier(long seed, String spec) {
-      Pair<Distribution, double[]> parsed = parse(spec);
-      return parsed.getKey().createLong(seed, parsed.getValue());
-    }
-
-    public static DoubleSupplier createDoubleSupplier(long seed, String spec) {
-      Pair<Distribution, double[]> parsed = parse(spec);
-      return parsed.getKey().createDouble(seed, parsed.getValue());
-    }
-
-    private static Pair<Distribution, double[]> parse(String spec) {
-      int paramsStart = spec.indexOf('(');
-      int paramsEnd = spec.indexOf(')');
-      double[] params = Arrays.stream(spec.substring(paramsStart + 1, 
paramsEnd).split(","))
-          .mapToDouble(s -> Double.parseDouble(s.trim()))
-          .toArray();
-      String dist = spec.substring(0, paramsStart).toUpperCase();
-      return Pair.of(Distribution.valueOf(dist), params);
-    }
-  }
-
   @State(Scope.Benchmark)
   public static class BaseState {
     @Param({"INT", "LONG", "FLOAT", "DOUBLE"})
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkScanDocIdIterators.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkScanDocIdIterators.java
new file mode 100644
index 0000000..56e7787
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkScanDocIdIterators.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.operator.dociditerators.SVScanDocIdIterator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedBitSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.roaringbitmap.RoaringBitmapWriter;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkScanDocIdIterators {
+
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"BenchmarkScanDocIdIterators");
+
+  @Param("10000000")
+  int _numDocs;
+
+  // how selective the bitmap is
+  @Param({"1", "2"})
+  private int _bitmapQuantile;
+
+  // how selective the predicate evaluator is
+  @Param({"1", "2"})
+  private int _thresholdQuantile;
+
+  @Param("42")
+  long _seed;
+
+  @Param({"UNIFORM(0,10000000)"})
+  String _distribution;
+
+  private DummyPredicateEvaluator _predicateEvaluator;
+  private FixedBitSVForwardIndexReaderV2 _readerV2;
+  private ImmutableRoaringBitmap _bitmap;
+  private PinotDataBuffer _dataBuffer;
+
+  @Setup(Level.Trial)
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+    FileUtils.forceMkdir(INDEX_DIR);
+    File indexFile = new File(INDEX_DIR, "index-file");
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter().get();
+    LongSupplier supplier = Distribution.createLongSupplier(_seed, 
_distribution);
+    int[] values = new int[_numDocs];
+    int max = Integer.MIN_VALUE;
+    for (int i = 0; i < values.length; i++) {
+      values[i] = (int) supplier.getAsLong();
+      max = Math.max(values[i], max);
+    }
+    int numBits = 32 - Integer.numberOfLeadingZeros(max);
+    int[] sorted = Arrays.copyOf(values, values.length);
+    Arrays.sort(sorted);
+    try (FixedBitSVForwardIndexWriter indexWriter = new 
FixedBitSVForwardIndexWriter(indexFile, _numDocs, numBits)) {
+      for (int i = 0; i < _numDocs; i++) {
+        indexWriter.putDictId(values[i]);
+      }
+    }
+    _dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
+    for (int i = 0; i < values.length; i++) {
+      if (values[i] < sorted[_bitmapQuantile * sorted.length / 10]) {
+        writer.add(i);
+      }
+    }
+    _bitmap = writer.get();
+    _predicateEvaluator = new 
DummyPredicateEvaluator(sorted[_thresholdQuantile * sorted.length / 10]);
+    _readerV2 = new FixedBitSVForwardIndexReaderV2(_dataBuffer, values.length, 
numBits);
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDown()
+      throws Exception {
+    _dataBuffer.close();
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+
+  @Benchmark
+  public MutableRoaringBitmap benchmarkSVLong() {
+    return new SVScanDocIdIterator(_predicateEvaluator, _readerV2, 
_numDocs).applyAnd(_bitmap);
+  }
+
+  public static class DummyPredicateEvaluator implements PredicateEvaluator {
+
+    private final int _threshold;
+
+    public DummyPredicateEvaluator(int threshold) {
+      _threshold = threshold;
+    }
+
+    @Override
+    public Predicate.Type getPredicateType() {
+      return null;
+    }
+
+    @Override
+    public boolean isDictionaryBased() {
+      return true;
+    }
+
+    @Override
+    public FieldSpec.DataType getDataType() {
+      return null;
+    }
+
+    @Override
+    public boolean isExclusive() {
+      return false;
+    }
+
+    @Override
+    public boolean isAlwaysTrue() {
+      return false;
+    }
+
+    @Override
+    public boolean isAlwaysFalse() {
+      return false;
+    }
+
+    @Override
+    public boolean applySV(int value) {
+      return value < _threshold;
+    }
+
+    @Override
+    public boolean applyMV(int[] values, int length) {
+      return false;
+    }
+
+    @Override
+    public int getNumMatchingDictIds() {
+      return 0;
+    }
+
+    @Override
+    public int[] getMatchingDictIds() {
+      return new int[0];
+    }
+
+    @Override
+    public int getNumNonMatchingDictIds() {
+      return 0;
+    }
+
+    @Override
+    public int[] getNonMatchingDictIds() {
+      return new int[0];
+    }
+
+    @Override
+    public boolean applySV(long value) {
+      return false;
+    }
+
+    @Override
+    public boolean applyMV(long[] values, int length) {
+      return false;
+    }
+
+    @Override
+    public boolean applySV(float value) {
+      return false;
+    }
+
+    @Override
+    public boolean applyMV(float[] values, int length) {
+      return false;
+    }
+
+    @Override
+    public boolean applySV(double value) {
+      return false;
+    }
+
+    @Override
+    public boolean applyMV(double[] values, int length) {
+      return false;
+    }
+
+    @Override
+    public boolean applySV(String value) {
+      return false;
+    }
+
+    @Override
+    public boolean applyMV(String[] values, int length) {
+      return false;
+    }
+
+    @Override
+    public boolean applySV(byte[] value) {
+      return false;
+    }
+
+    @Override
+    public boolean applyMV(byte[][] values, int length) {
+      return false;
+    }
+  }
+}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java
new file mode 100644
index 0000000..7739f09
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.SplittableRandom;
+import java.util.function.DoubleSupplier;
+import java.util.function.LongSupplier;
+import org.apache.commons.lang3.tuple.Pair;
+
+
+public enum Distribution {
+  NORMAL {
+    @Override
+    public DoubleSupplier createDouble(long seed, double... params) {
+      Random random = new Random(seed);
+      return () -> random.nextGaussian() * params[1] + params[0];
+    }
+  },
+  UNIFORM {
+    @Override
+    public DoubleSupplier createDouble(long seed, double... params) {
+      Random random = new Random(seed);
+      return () -> (params[1] - params[0]) * random.nextDouble() + params[0];
+    }
+  },
+  EXP {
+    @Override
+    public DoubleSupplier createDouble(long seed, double... params) {
+      Random random = new Random(seed);
+      return () -> -(Math.log(random.nextDouble()) / params[0]);
+    }
+  },
+  POWER {
+    @Override
+    public DoubleSupplier createDouble(long seed, double... params) {
+      long min = (long) params[0];
+      long max = (long) params[1];
+      double alpha = params[2];
+      SplittableRandom random = new SplittableRandom(seed);
+      return () -> (Math.pow((Math.pow(max, alpha + 1)
+          - Math.pow(min, alpha + 1) * (random.nextDouble() + 1)), 1D / (alpha 
+ 1)));
+    }
+  };
+
+  public LongSupplier createLong(long seed, double... params) {
+    DoubleSupplier source = createDouble(seed, params);
+    return () -> (long) source.getAsDouble();
+  }
+
+  public abstract DoubleSupplier createDouble(long seed, double... params);
+
+  public static LongSupplier createLongSupplier(long seed, String spec) {
+    Pair<Distribution, double[]> parsed = parse(spec);
+    return parsed.getKey().createLong(seed, parsed.getValue());
+  }
+
+  public static DoubleSupplier createDoubleSupplier(long seed, String spec) {
+    Pair<Distribution, double[]> parsed = parse(spec);
+    return parsed.getKey().createDouble(seed, parsed.getValue());
+  }
+
+  private static Pair<Distribution, double[]> parse(String spec) {
+    int paramsStart = spec.indexOf('(');
+    int paramsEnd = spec.indexOf(')');
+    double[] params = Arrays.stream(spec.substring(paramsStart + 1, 
paramsEnd).split(","))
+        .mapToDouble(s -> Double.parseDouble(s.trim()))
+        .toArray();
+    String dist = spec.substring(0, paramsStart).toUpperCase();
+    return Pair.of(Distribution.valueOf(dist), params);
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to