This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch acquire_release in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 57a971d8c50be484249341bd80d0893d8fcc95cc Author: Neha Pawar <[email protected]> AuthorDate: Thu Sep 23 18:53:07 2021 -0700 Benchmark --- .../AcquireReleaseColumnsSegmentOperator.java | 25 ++-- .../core/operator/combine/BaseCombineOperator.java | 11 +- .../combine/GroupByOrderByCombineOperator.java | 17 ++- .../plan/AcquireReleaseColumnsSegmentPlanNode.java | 2 +- .../pinot/perf/BenchmarkRoaringBitmapCreation.java | 164 +++++++++++++++++++++ .../index/readers/NullValueVectorReaderImpl.java | 9 +- .../index/readers/RangeIndexReaderImpl.java | 36 ----- 7 files changed, 207 insertions(+), 57 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java index 127f38f..04dc79b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.operator; import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.plan.PlanNode; import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.IndexSegment; @@ -31,13 +32,13 @@ import org.apache.pinot.segment.spi.IndexSegment; public class AcquireReleaseColumnsSegmentOperator extends BaseOperator { private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator"; - private final Operator _childOperator; + private final PlanNode _planNode; private final IndexSegment _indexSegment; private final FetchContext _fetchContext; + private Operator _childOperator; - public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment, - FetchContext fetchContext) { - _childOperator = childOperator; + public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) { + _planNode = planNode; _indexSegment = indexSegment; _fetchContext = fetchContext; } @@ -49,12 +50,16 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator { */ @Override protected Block getNextBlock() { + _childOperator = _planNode.run(); + return _childOperator.nextBlock(); + } + + public void acquire() { _indexSegment.acquire(_fetchContext); - try { - return _childOperator.nextBlock(); - } finally { - _indexSegment.release(_fetchContext); - } + } + + public void release() { + _indexSegment.release(_fetchContext); } @Override @@ -64,6 +69,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator { @Override public ExecutionStatistics getExecutionStatistics() { - return _childOperator.getExecutionStatistics(); + return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index eeb27c2..89c6f1f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; @@ -146,8 +147,12 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul */ protected void processSegments(int taskIndex) { for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) { + Operator operator = _operators.get(operatorIndex); try { - IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock(); + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock(); if (isQuerySatisfied(resultsBlock)) { // Query is satisfied, skip processing the remaining segments _blockingQueue.offer(resultsBlock); @@ -164,6 +169,10 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul e); _blockingQueue.offer(new IntermediateResultsBlock(e)); return; + } finally { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).release(); + } } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index b2d9373..d6c5dac 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -41,6 +41,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; @@ -125,9 +126,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { */ @Override protected void processSegments(int threadIndex) { + Operator operator = _operators.get(threadIndex); try { - IntermediateResultsBlock intermediateResultsBlock = - (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock(); + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) operator.nextBlock(); _initLock.lock(); try { @@ -186,9 +190,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { // Early-terminated because query times out or is already satisfied } catch (Exception e) { LOGGER.error("Caught exception while processing and combining group-by order-by for index: {}, operator: {}, " - + "queryContext: {}", threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e); + + "queryContext: {}", threadIndex, operator.getClass().getName(), _queryContext, e); _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); } finally { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).release(); + } _operatorLatch.countDown(); } } @@ -213,8 +220,8 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS); if (!opCompleted) { // If this happens, the broker side should already timed out, just log the error and return - String errorMessage = - String.format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs, + String errorMessage = String + .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs, _queryContext); LOGGER.error(errorMessage); return new IntermediateResultsBlock(new TimeoutException(errorMessage)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java index 5a9f506..5e517e1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java @@ -41,6 +41,6 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode { @Override public AcquireReleaseColumnsSegmentOperator run() { - return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext); + return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext); } } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java new file mode 100644 index 0000000..2b43508 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import java.io.File; +import java.io.IOException; +import java.lang.ref.SoftReference; +import java.nio.ByteOrder; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.math.RandomUtils; +import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter; +import org.apache.pinot.segment.spi.memory.PinotByteBuffer; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + + +@State(Scope.Benchmark) +@Fork(value = 1, jvmArgs = {"-server", "-Xmx8G", "-XX:MaxDirectMemorySize=16G"}) +public class BenchmarkRoaringBitmapCreation { + + private static final int NUM_DOCS = 1_000_000; + private static final int NUM_READS = 10000; + private static final File TEMP_DIR = + new File(FileUtils.getTempDirectory(), "bitmap_creation_benchmark_" + System.currentTimeMillis()); + + @Param({"1000", "10000", "100000"}) // 1k, 10k, 100k + public int _cardinality; + + private int _numBitmaps; + private BitmapInvertedIndexWriter _bitmapInvertedIndexWriter; + private SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps; + private PinotDataBuffer _offsetBuffer; + private PinotDataBuffer _bitmapBuffer; + private int _firstOffset; + private int[] _dictIdsToQuery; + + @Setup + public void setup() + throws IllegalAccessException, InstantiationException, IOException { + _numBitmaps = _cardinality; + File bufferDir = new File(TEMP_DIR, "cardinality_" + _cardinality); + FileUtils.forceMkdir(bufferDir); + File bufferFile = new File(bufferDir, "buffer"); + _bitmapInvertedIndexWriter = new BitmapInvertedIndexWriter(bufferFile, _numBitmaps); + // Insert between 10-1000 values per bitmap + for (int i = 0; i < _numBitmaps; i++) { + int size = 10 + RandomUtils.nextInt(990); + int[] data = new int[size]; + for (int j = 0; j < size; j++) { + data[j] = RandomUtils.nextInt(NUM_DOCS); // docIds will repeat across bitmaps, but doesn't matter for purpose of this benchmark + } + RoaringBitmap bitmap = RoaringBitmap.bitmapOf(data); + _bitmapInvertedIndexWriter.add(bitmap); + } + PinotDataBuffer dataBuffer = PinotByteBuffer.mapReadOnlyBigEndianFile(bufferFile); + long offsetBufferEndOffset = (long) (_numBitmaps + 1) * Integer.BYTES; + _offsetBuffer = dataBuffer.view(0, offsetBufferEndOffset, ByteOrder.BIG_ENDIAN); + _bitmapBuffer = dataBuffer.view(offsetBufferEndOffset, dataBuffer.size()); + _firstOffset = _offsetBuffer.getInt(0); + + // A fixed set of dictIds to read. This ensures same bitmap accessed multiple times. + _dictIdsToQuery = new int[100]; + for (int i = 0; i < 100; i++) { + _dictIdsToQuery[i] = RandomUtils.nextInt(_cardinality); + } + } + + @TearDown + public void teardown() + throws IOException { + _bitmapInvertedIndexWriter.close(); + FileUtils.deleteQuietly(TEMP_DIR); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void cacheReferences() { + _bitmaps = null; + for (int i = 0; i < NUM_READS; i++) { + int dictId = _dictIdsToQuery[RandomUtils.nextInt(_dictIdsToQuery.length)]; + getRoaringBitmapFromCache(dictId); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void alwaysBuild() { + for (int i = 0; i < NUM_READS; i++) { + int dictId = _dictIdsToQuery[RandomUtils.nextInt(_dictIdsToQuery.length)]; + buildRoaringBitmap(dictId); + } + } + + private ImmutableRoaringBitmap getRoaringBitmapFromCache(int dictId) { + SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = (_bitmaps != null) ? _bitmaps.get() : null; + if (bitmapArrayReference != null) { + SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId]; + ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null; + if (bitmap != null) { + return bitmap; + } + } else { + bitmapArrayReference = new SoftReference[_numBitmaps]; + _bitmaps = new SoftReference<>(bitmapArrayReference); + } + synchronized (this) { + SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId]; + ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null; + if (bitmap == null) { + bitmap = buildRoaringBitmap(dictId); + bitmapArrayReference[dictId] = new SoftReference<>(bitmap); + } + return bitmap; + } + } + + private ImmutableRoaringBitmap buildRoaringBitmap(int dictId) { + int offset = _offsetBuffer.getInt(dictId * Integer.BYTES); + int length = _offsetBuffer.getInt((dictId + 1) * Integer.BYTES) - offset; + return new ImmutableRoaringBitmap(_bitmapBuffer.toDirectByteBuffer(offset - _firstOffset, length)); + } + + public static void main(String[] args) + throws Exception { + ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkRoaringBitmapCreation.class.getSimpleName()) + .warmupTime(TimeValue.seconds(10)).warmupIterations(1).measurementTime(TimeValue.seconds(5)) + .measurementIterations(1).forks(1); + new Runner(opt.build()).run(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java index 7a60cd8..9dc6112 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java @@ -25,18 +25,19 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap; public class NullValueVectorReaderImpl implements NullValueVectorReader { - private final ImmutableRoaringBitmap _nullBitmap; + private final PinotDataBuffer _dataBuffer; public NullValueVectorReaderImpl(PinotDataBuffer dataBuffer) { - _nullBitmap = new ImmutableRoaringBitmap(dataBuffer.toDirectByteBuffer(0, (int) dataBuffer.size())); + _dataBuffer = dataBuffer; } public boolean isNull(int docId) { - return _nullBitmap.contains(docId); + ImmutableRoaringBitmap nullBitmap = new ImmutableRoaringBitmap(_dataBuffer.toDirectByteBuffer(0, (int) _dataBuffer.size())); + return nullBitmap.contains(docId); } @Override public ImmutableRoaringBitmap getNullBitmap() { - return _nullBitmap; + return new ImmutableRoaringBitmap(_dataBuffer.toDirectByteBuffer(0, (int) _dataBuffer.size())); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java index a2733ce..697fda1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java @@ -19,7 +19,6 @@ package org.apache.pinot.segment.local.segment.index.readers; import com.google.common.base.Preconditions; -import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import javax.annotation.Nullable; import org.apache.pinot.segment.spi.index.reader.RangeIndexReader; @@ -43,8 +42,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi private final Number[] _rangeStartArray; private final Number _lastRangeEnd; - private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps; - public RangeIndexReaderImpl(PinotDataBuffer dataBuffer) { _dataBuffer = dataBuffer; long offset = 0; @@ -177,39 +174,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi } private ImmutableRoaringBitmap getDocIds(int rangeId) { - SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null; - // Return the bitmap if it's still on heap - if (_bitmaps != null) { - bitmapArrayReference = _bitmaps.get(); - if (bitmapArrayReference != null) { - SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[rangeId]; - if (bitmapReference != null) { - ImmutableRoaringBitmap value = bitmapReference.get(); - if (value != null) { - return value; - } - } - } else { - bitmapArrayReference = new SoftReference[_numRanges]; - _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference); - } - } else { - bitmapArrayReference = new SoftReference[_numRanges]; - _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference); - } - synchronized (this) { - ImmutableRoaringBitmap value; - if (bitmapArrayReference[rangeId] == null || bitmapArrayReference[rangeId].get() == null) { - value = buildRoaringBitmapForIndex(rangeId); - bitmapArrayReference[rangeId] = new SoftReference<ImmutableRoaringBitmap>(value); - } else { - value = bitmapArrayReference[rangeId].get(); - } - return value; - } - } - - private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int rangeId) { final long currentOffset = getOffset(rangeId); final long nextOffset = getOffset(rangeId + 1); final int bufferLength = (int) (nextOffset - currentOffset); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
