This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7be21c8 Faster bitmap scans (#7530)
7be21c8 is described below
commit 7be21c80b352a02c58807b3c95d535f5fae394cc
Author: Richard Startin <[email protected]>
AuthorDate: Thu Oct 7 00:53:07 2021 +0100
Faster bitmap scans (#7530)
Uses better APIs from RoaringBitmap to speed up scans by around 25%
---
.../pinot/core/common/BlockDocIdIterator.java | 6 +
.../dociditerators/MVScanDocIdIterator.java | 29 ++-
.../dociditerators/SVScanDocIdIterator.java | 27 ++-
.../org/apache/pinot/perf/BenchmarkRangeIndex.java | 65 ------
.../pinot/perf/BenchmarkScanDocIdIterators.java | 246 +++++++++++++++++++++
.../java/org/apache/pinot/perf/Distribution.java | 89 ++++++++
6 files changed, 378 insertions(+), 84 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockDocIdIterator.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockDocIdIterator.java
index f13e96e..2dc9bd1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockDocIdIterator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockDocIdIterator.java
@@ -41,4 +41,10 @@ public interface BlockDocIdIterator {
* <p>NOTE: There should be no more calls to this method after it returns
{@link Constants#EOF}.
*/
int advance(int targetDocId);
+
+ /**
+ * Empirically determined to be the best batch size for batch iterators.
+ * @see
{https://github.com/RoaringBitmap/RoaringBitmap/pull/243#issuecomment-381278304}
+ */
+ int OPTIMAL_ITERATOR_BATCH_SIZE = 256;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
index a2a605e..aa820e7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
@@ -22,7 +22,8 @@ import
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.BatchIterator;
+import org.roaringbitmap.RoaringBitmapWriter;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -73,17 +74,25 @@ public final class MVScanDocIdIterator implements
ScanBasedDocIdIterator {
@Override
public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
- MutableRoaringBitmap result = new MutableRoaringBitmap();
- IntIterator docIdIterator = docIds.getIntIterator();
- int nextDocId;
- while (docIdIterator.hasNext() && (nextDocId = docIdIterator.next()) <
_numDocs) {
- int length = _reader.getDictIdMV(nextDocId, _dictIdBuffer,
_readerContext);
- _numEntriesScanned += length;
- if (_predicateEvaluator.applyMV(_dictIdBuffer, length)) {
- result.add(nextDocId);
+ if (docIds.isEmpty()) {
+ return new MutableRoaringBitmap();
+ }
+ RoaringBitmapWriter<MutableRoaringBitmap> result =
RoaringBitmapWriter.bufferWriter()
+ .expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
+ BatchIterator docIdIterator = docIds.getBatchIterator();
+ int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+ while (docIdIterator.hasNext()) {
+ int limit = docIdIterator.nextBatch(buffer);
+ for (int i = 0; i < limit; i++) {
+ int nextDocId = buffer[i];
+ int length = _reader.getDictIdMV(nextDocId, _dictIdBuffer,
_readerContext);
+ _numEntriesScanned += length;
+ if (_predicateEvaluator.applyMV(_dictIdBuffer, length)) {
+ result.add(nextDocId);
+ }
}
}
- return result;
+ return result.get();
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
index 986e08e..f327abe 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
@@ -22,7 +22,8 @@ import
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.BatchIterator;
+import org.roaringbitmap.RoaringBitmapWriter;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -72,16 +73,24 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
@Override
public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
- MutableRoaringBitmap result = new MutableRoaringBitmap();
- IntIterator docIdIterator = docIds.getIntIterator();
- int nextDocId;
- while (docIdIterator.hasNext() && (nextDocId = docIdIterator.next()) <
_numDocs) {
- _numEntriesScanned++;
- if (_valueMatcher.doesValueMatch(nextDocId)) {
- result.add(nextDocId);
+ if (docIds.isEmpty()) {
+ return new MutableRoaringBitmap();
+ }
+ RoaringBitmapWriter<MutableRoaringBitmap> result =
RoaringBitmapWriter.bufferWriter()
+ .expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
+ BatchIterator docIdIterator = docIds.getBatchIterator();
+ int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+ while (docIdIterator.hasNext()) {
+ int limit = docIdIterator.nextBatch(buffer);
+ for (int i = 0; i < limit; i++) {
+ int nextDocId = buffer[i];
+ _numEntriesScanned++;
+ if (_valueMatcher.doesValueMatch(nextDocId)) {
+ result.add(nextDocId);
+ }
}
}
- return result;
+ return result.get();
}
@Override
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
index 25079ca..60db2a3 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
@@ -21,13 +21,10 @@ package org.apache.pinot.perf;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Random;
-import java.util.SplittableRandom;
import java.util.function.DoubleSupplier;
import java.util.function.LongSupplier;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
import
org.apache.pinot.segment.local.segment.index.readers.BitSlicedRangeIndexReader;
@@ -59,68 +56,6 @@ public class BenchmarkRangeIndex {
private static final String COLUMN_NAME = "col";
- public enum Distribution {
- NORMAL {
- @Override
- public DoubleSupplier createDouble(long seed, double... params) {
- Random random = new Random(seed);
- return () -> random.nextGaussian() * params[1] + params[0];
- }
- },
- UNIFORM {
- @Override
- public DoubleSupplier createDouble(long seed, double... params) {
- Random random = new Random(seed);
- return () -> (params[1] - params[0]) * random.nextDouble() + params[0];
- }
- },
- EXP {
- @Override
- public DoubleSupplier createDouble(long seed, double... params) {
- Random random = new Random(seed);
- return () -> -(Math.log(random.nextDouble()) / params[0]);
- }
- },
- POWER {
- @Override
- public DoubleSupplier createDouble(long seed, double... params) {
- long min = (long) params[0];
- long max = (long) params[1];
- double alpha = params[2];
- SplittableRandom random = new SplittableRandom(seed);
- return () -> (Math.pow((Math.pow(max, alpha + 1)
- - Math.pow(min, alpha + 1) * (random.nextDouble() + 1)), 1D /
(alpha + 1)));
- }
- };
-
- public LongSupplier createLong(long seed, double... params) {
- DoubleSupplier source = createDouble(seed, params);
- return () -> (long) source.getAsDouble();
- }
-
- public abstract DoubleSupplier createDouble(long seed, double... params);
-
- public static LongSupplier createLongSupplier(long seed, String spec) {
- Pair<Distribution, double[]> parsed = parse(spec);
- return parsed.getKey().createLong(seed, parsed.getValue());
- }
-
- public static DoubleSupplier createDoubleSupplier(long seed, String spec) {
- Pair<Distribution, double[]> parsed = parse(spec);
- return parsed.getKey().createDouble(seed, parsed.getValue());
- }
-
- private static Pair<Distribution, double[]> parse(String spec) {
- int paramsStart = spec.indexOf('(');
- int paramsEnd = spec.indexOf(')');
- double[] params = Arrays.stream(spec.substring(paramsStart + 1,
paramsEnd).split(","))
- .mapToDouble(s -> Double.parseDouble(s.trim()))
- .toArray();
- String dist = spec.substring(0, paramsStart).toUpperCase();
- return Pair.of(Distribution.valueOf(dist), params);
- }
- }
-
@State(Scope.Benchmark)
public static class BaseState {
@Param({"INT", "LONG", "FLOAT", "DOUBLE"})
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkScanDocIdIterators.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkScanDocIdIterators.java
new file mode 100644
index 0000000..56e7787
--- /dev/null
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkScanDocIdIterators.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.operator.dociditerators.SVScanDocIdIterator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedBitSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.roaringbitmap.RoaringBitmapWriter;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Benchmark)
+public class BenchmarkScanDocIdIterators {
+
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"BenchmarkScanDocIdIterators");
+
+ @Param("10000000")
+ int _numDocs;
+
+ // how selective the bitmap is
+ @Param({"1", "2"})
+ private int _bitmapQuantile;
+
+ // how selective the predicate evaluator is
+ @Param({"1", "2"})
+ private int _thresholdQuantile;
+
+ @Param("42")
+ long _seed;
+
+ @Param({"UNIFORM(0,10000000)"})
+ String _distribution;
+
+ private DummyPredicateEvaluator _predicateEvaluator;
+ private FixedBitSVForwardIndexReaderV2 _readerV2;
+ private ImmutableRoaringBitmap _bitmap;
+ private PinotDataBuffer _dataBuffer;
+
+ @Setup(Level.Trial)
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+ FileUtils.forceMkdir(INDEX_DIR);
+ File indexFile = new File(INDEX_DIR, "index-file");
+ RoaringBitmapWriter<MutableRoaringBitmap> writer =
RoaringBitmapWriter.bufferWriter().get();
+ LongSupplier supplier = Distribution.createLongSupplier(_seed,
_distribution);
+ int[] values = new int[_numDocs];
+ int max = Integer.MIN_VALUE;
+ for (int i = 0; i < values.length; i++) {
+ values[i] = (int) supplier.getAsLong();
+ max = Math.max(values[i], max);
+ }
+ int numBits = 32 - Integer.numberOfLeadingZeros(max);
+ int[] sorted = Arrays.copyOf(values, values.length);
+ Arrays.sort(sorted);
+ try (FixedBitSVForwardIndexWriter indexWriter = new
FixedBitSVForwardIndexWriter(indexFile, _numDocs, numBits)) {
+ for (int i = 0; i < _numDocs; i++) {
+ indexWriter.putDictId(values[i]);
+ }
+ }
+ _dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
+ for (int i = 0; i < values.length; i++) {
+ if (values[i] < sorted[_bitmapQuantile * sorted.length / 10]) {
+ writer.add(i);
+ }
+ }
+ _bitmap = writer.get();
+ _predicateEvaluator = new
DummyPredicateEvaluator(sorted[_thresholdQuantile * sorted.length / 10]);
+ _readerV2 = new FixedBitSVForwardIndexReaderV2(_dataBuffer, values.length,
numBits);
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown()
+ throws Exception {
+ _dataBuffer.close();
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+
+ @Benchmark
+ public MutableRoaringBitmap benchmarkSVLong() {
+ return new SVScanDocIdIterator(_predicateEvaluator, _readerV2,
_numDocs).applyAnd(_bitmap);
+ }
+
+ public static class DummyPredicateEvaluator implements PredicateEvaluator {
+
+ private final int _threshold;
+
+ public DummyPredicateEvaluator(int threshold) {
+ _threshold = threshold;
+ }
+
+ @Override
+ public Predicate.Type getPredicateType() {
+ return null;
+ }
+
+ @Override
+ public boolean isDictionaryBased() {
+ return true;
+ }
+
+ @Override
+ public FieldSpec.DataType getDataType() {
+ return null;
+ }
+
+ @Override
+ public boolean isExclusive() {
+ return false;
+ }
+
+ @Override
+ public boolean isAlwaysTrue() {
+ return false;
+ }
+
+ @Override
+ public boolean isAlwaysFalse() {
+ return false;
+ }
+
+ @Override
+ public boolean applySV(int value) {
+ return value < _threshold;
+ }
+
+ @Override
+ public boolean applyMV(int[] values, int length) {
+ return false;
+ }
+
+ @Override
+ public int getNumMatchingDictIds() {
+ return 0;
+ }
+
+ @Override
+ public int[] getMatchingDictIds() {
+ return new int[0];
+ }
+
+ @Override
+ public int getNumNonMatchingDictIds() {
+ return 0;
+ }
+
+ @Override
+ public int[] getNonMatchingDictIds() {
+ return new int[0];
+ }
+
+ @Override
+ public boolean applySV(long value) {
+ return false;
+ }
+
+ @Override
+ public boolean applyMV(long[] values, int length) {
+ return false;
+ }
+
+ @Override
+ public boolean applySV(float value) {
+ return false;
+ }
+
+ @Override
+ public boolean applyMV(float[] values, int length) {
+ return false;
+ }
+
+ @Override
+ public boolean applySV(double value) {
+ return false;
+ }
+
+ @Override
+ public boolean applyMV(double[] values, int length) {
+ return false;
+ }
+
+ @Override
+ public boolean applySV(String value) {
+ return false;
+ }
+
+ @Override
+ public boolean applyMV(String[] values, int length) {
+ return false;
+ }
+
+ @Override
+ public boolean applySV(byte[] value) {
+ return false;
+ }
+
+ @Override
+ public boolean applyMV(byte[][] values, int length) {
+ return false;
+ }
+ }
+}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java
new file mode 100644
index 0000000..7739f09
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/Distribution.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.SplittableRandom;
+import java.util.function.DoubleSupplier;
+import java.util.function.LongSupplier;
+import org.apache.commons.lang3.tuple.Pair;
+
+
+public enum Distribution {
+ NORMAL {
+ @Override
+ public DoubleSupplier createDouble(long seed, double... params) {
+ Random random = new Random(seed);
+ return () -> random.nextGaussian() * params[1] + params[0];
+ }
+ },
+ UNIFORM {
+ @Override
+ public DoubleSupplier createDouble(long seed, double... params) {
+ Random random = new Random(seed);
+ return () -> (params[1] - params[0]) * random.nextDouble() + params[0];
+ }
+ },
+ EXP {
+ @Override
+ public DoubleSupplier createDouble(long seed, double... params) {
+ Random random = new Random(seed);
+ return () -> -(Math.log(random.nextDouble()) / params[0]);
+ }
+ },
+ POWER {
+ @Override
+ public DoubleSupplier createDouble(long seed, double... params) {
+ long min = (long) params[0];
+ long max = (long) params[1];
+ double alpha = params[2];
+ SplittableRandom random = new SplittableRandom(seed);
+ return () -> (Math.pow((Math.pow(max, alpha + 1)
+ - Math.pow(min, alpha + 1) * (random.nextDouble() + 1)), 1D / (alpha
+ 1)));
+ }
+ };
+
+ public LongSupplier createLong(long seed, double... params) {
+ DoubleSupplier source = createDouble(seed, params);
+ return () -> (long) source.getAsDouble();
+ }
+
+ public abstract DoubleSupplier createDouble(long seed, double... params);
+
+ public static LongSupplier createLongSupplier(long seed, String spec) {
+ Pair<Distribution, double[]> parsed = parse(spec);
+ return parsed.getKey().createLong(seed, parsed.getValue());
+ }
+
+ public static DoubleSupplier createDoubleSupplier(long seed, String spec) {
+ Pair<Distribution, double[]> parsed = parse(spec);
+ return parsed.getKey().createDouble(seed, parsed.getValue());
+ }
+
+ private static Pair<Distribution, double[]> parse(String spec) {
+ int paramsStart = spec.indexOf('(');
+ int paramsEnd = spec.indexOf(')');
+ double[] params = Arrays.stream(spec.substring(paramsStart + 1,
paramsEnd).split(","))
+ .mapToDouble(s -> Double.parseDouble(s.trim()))
+ .toArray();
+ String dist = spec.substring(0, paramsStart).toUpperCase();
+ return Pair.of(Distribution.valueOf(dist), params);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]