This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bed2e30 Power of 2 fixed size chunks (#7934)
bed2e30 is described below
commit bed2e307284ca2e43ff6e572be57c01f970645c3
Author: Richard Startin <[email protected]>
AuthorDate: Thu Dec 23 18:48:12 2021 +0000
Power of 2 fixed size chunks (#7934)
* power of 2 fixed-byte chunk reader
* change method name
---
.../BenchmarkFixedByteSVForwardIndexReader.java | 147 +++++++++++++++++++
.../pinot/perf/BenchmarkRawForwardIndexReader.java | 4 +-
.../writer/impl/BaseChunkSVForwardIndexWriter.java | 7 +-
.../impl/FixedByteChunkSVForwardIndexWriter.java | 13 +-
.../impl/VarByteChunkSVForwardIndexWriter.java | 2 +-
.../index/readers/DefaultIndexReaderProvider.java | 7 +-
.../forward/BaseChunkSVForwardIndexReader.java | 49 +------
.../index/readers/forward/ChunkReaderContext.java | 66 +++++++++
.../FixedBytePower2ChunkSVForwardIndexReader.java | 114 +++++++++++++++
.../MultiValueFixedByteRawIndexCreatorTest.java | 2 +-
.../MultiValueVarByteRawIndexCreatorTest.java | 2 +-
.../segment/index/creator/RawIndexCreatorTest.java | 21 ++-
.../forward/FixedByteChunkSVForwardIndexTest.java | 162 ++++++++++-----------
.../forward/VarByteChunkSVForwardIndexTest.java | 12 +-
14 files changed, 449 insertions(+), 159 deletions(-)
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
new file mode 100644
index 0000000..f5c7e37
--- /dev/null
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+
+
+@State(Scope.Benchmark)
+public class BenchmarkFixedByteSVForwardIndexReader {
+
+ private static final File INDEX_DIR =
+ new File(FileUtils.getTempDirectory(),
"BenchmarkFixedByteSVForwardIndexReader");
+
+ @Param("10000")
+ int _blockSize;
+
+ @Param("1000")
+ int _numBlocks;
+
+ private int[] _docIds;
+ private double[] _doubleBuffer;
+ private long[] _longBuffer;
+ private FixedByteChunkSVForwardIndexReader _compressedReader;
+ private FixedBytePower2ChunkSVForwardIndexReader _compressedPow2Reader;
+
+ @Setup(Level.Trial)
+ public void setup()
+ throws IOException {
+ FileUtils.forceMkdir(INDEX_DIR);
+ File compressedIndexFile = new File(INDEX_DIR,
UUID.randomUUID().toString());
+ File pow2CompressedIndexFile = new File(INDEX_DIR,
UUID.randomUUID().toString());
+ _doubleBuffer = new double[_blockSize];
+ _longBuffer = new long[_blockSize];
+ try (FixedByteChunkSVForwardIndexWriter writer = new
FixedByteChunkSVForwardIndexWriter(compressedIndexFile,
+ ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES,
3);
+ FixedByteChunkSVForwardIndexWriter pow2Writer = new
FixedByteChunkSVForwardIndexWriter(pow2CompressedIndexFile,
+ ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000,
Long.BYTES, 4)) {
+ for (int i = 0; i < _numBlocks * _blockSize; i++) {
+ long next = ThreadLocalRandom.current().nextLong();
+ writer.putLong(next);
+ pow2Writer.putLong(next);
+ }
+ }
+ _compressedReader = new
FixedByteChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(compressedIndexFile),
+ FieldSpec.DataType.LONG);
+ _compressedPow2Reader =
+ new
FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(pow2CompressedIndexFile),
+ FieldSpec.DataType.LONG);
+ _docIds = new int[_blockSize];
+ }
+
+ @TearDown(Level.Trial)
+ public void teardown()
+ throws IOException {
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+
+ @Benchmark
+ public void readCompressedDoublesNonContiguousV3(Blackhole bh)
+ throws IOException {
+ readCompressedDoublesNonContiguous(bh, _compressedReader);
+ }
+
+ @Benchmark
+ public void readCompressedDoublesNonContiguousV4(Blackhole bh)
+ throws IOException {
+ readCompressedDoublesNonContiguous(bh, _compressedPow2Reader);
+ }
+
+ @Benchmark
+ public void readCompressedLongsNonContiguousV3(Blackhole bh)
+ throws IOException {
+ readCompressedLongsNonContiguous(bh, _compressedReader);
+ }
+
+ @Benchmark
+ public void readCompressedLongsNonContiguousV4(Blackhole bh)
+ throws IOException {
+ readCompressedLongsNonContiguous(bh, _compressedPow2Reader);
+ }
+
+ private void readCompressedLongsNonContiguous(Blackhole bh,
ForwardIndexReader<ChunkReaderContext> reader)
+ throws IOException {
+ try (ChunkReaderContext context = reader.createContext()) {
+ for (int block = 0; block < _numBlocks / 2; block++) {
+ for (int i = 0; i < _docIds.length; i++) {
+ _docIds[i] = block * _blockSize + i * 2;
+ }
+ for (int i = 0; i < _docIds.length; i++) {
+ _longBuffer[i] = reader.getLong(_docIds[i], context);
+ }
+ bh.consume(_longBuffer);
+ }
+ }
+ }
+
+ private void readCompressedDoublesNonContiguous(Blackhole bh,
ForwardIndexReader<ChunkReaderContext> reader)
+ throws IOException {
+ try (ChunkReaderContext context = reader.createContext()) {
+ for (int block = 0; block < _numBlocks / 2; block++) {
+ for (int i = 0; i < _docIds.length; i++) {
+ _docIds[i] = block * _blockSize + i * 2;
+ }
+ for (int i = 0; i < _docIds.length; i++) {
+ _doubleBuffer[i] = reader.getDouble(_docIds[i], context);
+ }
+ bh.consume(_doubleBuffer);
+ }
+ }
+ }
+}
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
index 7bc3fd8..ae223e7 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
@@ -27,7 +27,7 @@ import java.util.function.LongSupplier;
import org.apache.commons.io.FileUtils;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -200,7 +200,7 @@ public class BenchmarkRawForwardIndexReader {
try (PinotDataBuffer buffer =
PinotDataBuffer.loadBigEndianFile(state._file);
VarByteChunkSVForwardIndexReader reader =
new VarByteChunkSVForwardIndexReader(buffer,
FieldSpec.DataType.BYTES);
- BaseChunkSVForwardIndexReader.ChunkReaderContext context =
reader.createContext()) {
+ ChunkReaderContext context = reader.createContext()) {
for (int i = 0; i < state._records; i++) {
bh.consume(reader.getBytes(i, context));
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
index 11b4361..71588d6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
@@ -65,12 +65,14 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
* @param chunkSize Size of chunk
* @param sizeOfEntry Size of entry (in bytes), max size for variable byte
implementation.
* @param version version of File
+ * @param fixed if the data type is fixed width (required for version
validation)
* @throws IOException if the file isn't found or can't be mapped
*/
protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
- int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version)
+ int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version,
boolean fixed)
throws IOException {
- Preconditions.checkArgument(version == DEFAULT_VERSION || version ==
CURRENT_VERSION);
+ Preconditions.checkArgument(version == DEFAULT_VERSION || version ==
CURRENT_VERSION
+ || (fixed && version == 4));
_chunkSize = chunkSize;
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
_headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
@@ -87,6 +89,7 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
case 2:
return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2;
case 3:
+ case 4:
return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3;
default:
throw new IllegalStateException("Invalid version: " + version);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
index 8d9ad7e..7b942b1 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
@@ -71,8 +71,9 @@ public class FixedByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexW
public FixedByteChunkSVForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
int numDocsPerChunk, int sizeOfEntry, int writerVersion)
throws IOException {
- super(file, compressionType, totalDocs, numDocsPerChunk, (sizeOfEntry *
numDocsPerChunk), sizeOfEntry,
- writerVersion);
+ super(file, compressionType, totalDocs,
normalizeDocsPerChunk(writerVersion, numDocsPerChunk),
+ (sizeOfEntry * normalizeDocsPerChunk(writerVersion, numDocsPerChunk)),
sizeOfEntry,
+ writerVersion, true);
_chunkDataOffset = 0;
}
@@ -112,4 +113,12 @@ public class FixedByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexW
writeChunk();
}
}
+
+ private static int normalizeDocsPerChunk(int version, int numDocsPerChunk) {
+ // V4 uses power of 2 chunk sizes for random access efficiency
+ if (version >= 4 && (numDocsPerChunk & (numDocsPerChunk - 1)) != 0) {
+ return 1 << (32 - Integer.numberOfLeadingZeros(numDocsPerChunk - 1));
+ }
+ return numDocsPerChunk;
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
index f035a62..7e99772 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
@@ -80,7 +80,7 @@ public class VarByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexWri
super(file, compressionType, totalDocs, numDocsPerChunk,
numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE +
lengthOfLongestEntry),
// chunkSize
- lengthOfLongestEntry, writerVersion);
+ lengthOfLongestEntry, writerVersion, false);
_chunkHeaderOffset = 0;
_chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java
index 79cb55d..a342e85 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java
@@ -30,6 +30,7 @@ import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVFo
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
@@ -89,10 +90,12 @@ public class DefaultIndexReaderProvider implements
IndexReaderProvider {
} else {
FieldSpec.DataType storedType =
columnMetadata.getDataType().getStoredType();
if (columnMetadata.isSingleValue()) {
+ int version = dataBuffer.getInt(0);
if (storedType.isFixedWidth()) {
- return new FixedByteChunkSVForwardIndexReader(dataBuffer,
storedType);
+ return version >= FixedBytePower2ChunkSVForwardIndexReader.VERSION
+ ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer,
storedType)
+ : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
}
- int version = dataBuffer.getInt(0);
if (version >= VarByteChunkSVForwardIndexWriterV4.VERSION) {
return new VarByteChunkSVForwardIndexReaderV4(dataBuffer,
storedType);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
index 56090ca..dce4a90 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
@@ -26,8 +26,6 @@ import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWrit
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.apache.pinot.segment.spi.memory.CleanerUtil;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.slf4j.Logger;
@@ -37,8 +35,7 @@ import org.slf4j.LoggerFactory;
/**
* Base implementation for chunk-based single-value raw
(non-dictionary-encoded) forward index reader.
*/
-public abstract class BaseChunkSVForwardIndexReader
- implements
ForwardIndexReader<BaseChunkSVForwardIndexReader.ChunkReaderContext> {
+public abstract class BaseChunkSVForwardIndexReader implements
ForwardIndexReader<ChunkReaderContext> {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseChunkSVForwardIndexReader.class);
protected final PinotDataBuffer _dataBuffer;
@@ -114,7 +111,10 @@ public abstract class BaseChunkSVForwardIndexReader
if (context.getChunkId() == chunkId) {
return context.getChunkBuffer();
}
+ return decompressChunk(chunkId, context);
+ }
+ protected ByteBuffer decompressChunk(int chunkId, ChunkReaderContext
context) {
int chunkSize;
long chunkPosition = getChunkPosition(chunkId);
@@ -172,45 +172,4 @@ public abstract class BaseChunkSVForwardIndexReader
// NOTE: DO NOT close the PinotDataBuffer here because it is tracked by
the caller and might be reused later. The
// caller is responsible of closing the PinotDataBuffer.
}
-
- /**
- * Context for the chunk-based forward index readers.
- * <p>Information saved in the context can be used by subsequent reads as
cache:
- * <ul>
- * <li>
- * Chunk Buffer from the previous read. Useful if the subsequent read is
from the same buffer, as it avoids extra
- * chunk decompression.
- * </li>
- * <li>Id for the chunk</li>
- * </ul>
- */
- public static class ChunkReaderContext implements ForwardIndexReaderContext {
- private final ByteBuffer _chunkBuffer;
- private int _chunkId;
-
- public ChunkReaderContext(int maxChunkSize) {
- _chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize);
- _chunkId = -1;
- }
-
- public ByteBuffer getChunkBuffer() {
- return _chunkBuffer;
- }
-
- public int getChunkId() {
- return _chunkId;
- }
-
- public void setChunkId(int chunkId) {
- _chunkId = chunkId;
- }
-
- @Override
- public void close()
- throws IOException {
- if (CleanerUtil.UNMAP_SUPPORTED) {
- CleanerUtil.getCleaner().freeBuffer(_chunkBuffer);
- }
- }
- }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java
new file mode 100644
index 0000000..1adf685
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+
+
+/**
+ * Context for the chunk-based forward index readers.
+ * <p>Information saved in the context can be used by subsequent reads as
cache:
+ * <ul>
+ * <li>
+ * Chunk Buffer from the previous read. Useful if the subsequent read is
from the same buffer, as it avoids extra
+ * chunk decompression.
+ * </li>
+ * <li>Id for the chunk</li>
+ * </ul>
+ */
+public class ChunkReaderContext implements ForwardIndexReaderContext {
+ private final ByteBuffer _chunkBuffer;
+ private int _chunkId;
+
+ public ChunkReaderContext(int maxChunkSize) {
+ _chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize);
+ _chunkId = -1;
+ }
+
+ public ByteBuffer getChunkBuffer() {
+ return _chunkBuffer;
+ }
+
+ public int getChunkId() {
+ return _chunkId;
+ }
+
+ public void setChunkId(int chunkId) {
+ _chunkId = chunkId;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ if (CleanerUtil.UNMAP_SUPPORTED) {
+ CleanerUtil.getCleaner().freeBuffer(_chunkBuffer);
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
new file mode 100644
index 0000000..0effd62
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader
for values of fixed length data type (INT,
+ * LONG, FLOAT, DOUBLE).
+ * <p>For data layout, please refer to the documentation for {@link
FixedByteChunkSVForwardIndexWriter}
+ */
+public final class FixedBytePower2ChunkSVForwardIndexReader extends
BaseChunkSVForwardIndexReader {
+ public static final int VERSION = 4;
+
+ private final int _shift;
+
+ public FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer dataBuffer,
DataType valueType) {
+ super(dataBuffer, valueType);
+ _shift = Integer.numberOfTrailingZeros(_numDocsPerChunk);
+ }
+
+ @Nullable
+ @Override
+ public ChunkReaderContext createContext() {
+ if (_isCompressed) {
+ return new ChunkReaderContext(_numDocsPerChunk * _valueType.size());
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int getInt(int docId, ChunkReaderContext context) {
+ if (_isCompressed) {
+ int chunkRowId = docId & (_numDocsPerChunk - 1);
+ ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+ return chunkBuffer.getInt(chunkRowId * Integer.BYTES);
+ } else {
+ return _rawData.getInt(docId * Integer.BYTES);
+ }
+ }
+
+ @Override
+ public long getLong(int docId, ChunkReaderContext context) {
+ if (_isCompressed) {
+ int chunkRowId = docId & (_numDocsPerChunk - 1);
+ ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+ return chunkBuffer.getLong(chunkRowId * Long.BYTES);
+ } else {
+ return _rawData.getLong(docId * Long.BYTES);
+ }
+ }
+
+ @Override
+ public float getFloat(int docId, ChunkReaderContext context) {
+ if (_isCompressed) {
+ int chunkRowId = docId & (_numDocsPerChunk - 1);
+ ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+ return chunkBuffer.getFloat(chunkRowId * Float.BYTES);
+ } else {
+ return _rawData.getFloat(docId * Float.BYTES);
+ }
+ }
+
+ @Override
+ public double getDouble(int docId, ChunkReaderContext context) {
+ if (_isCompressed) {
+ int chunkRowId = docId & (_numDocsPerChunk - 1);
+ ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+ return chunkBuffer.getDouble(chunkRowId * Double.BYTES);
+ } else {
+ return _rawData.getDouble(docId * Double.BYTES);
+ }
+ }
+
+ /**
+ * Helper method to return the chunk buffer that contains the value at the
given document id.
+ * <ul>
+ * <li> If the chunk already exists in the reader context, returns the
same. </li>
+ * <li> Otherwise, loads the chunk for the row, and sets it in the reader
context. </li>
+ * </ul>
+ * @param docId Document id
+ * @param context Reader context
+ * @return Chunk for the row
+ */
+ protected ByteBuffer getChunkBuffer(int docId, ChunkReaderContext context) {
+ int chunkId = docId >>> _shift;
+ if (context.getChunkId() == chunkId) {
+ return context.getChunkBuffer();
+ }
+ return decompressChunk(chunkId, context);
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
index e4b6c15..fb49dce 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index f5ad70d..fb645f5 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -27,7 +27,7 @@ import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
index 890292e..af99c5a 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
@@ -31,7 +31,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.common.utils.StringUtil;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
@@ -169,11 +169,9 @@ public class RawIndexCreatorTest {
public void testStringRawIndexCreator()
throws Exception {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN);
- try (VarByteChunkSVForwardIndexReader rawIndexReader = new
VarByteChunkSVForwardIndexReader(
- indexBuffer,
+ try (VarByteChunkSVForwardIndexReader rawIndexReader = new
VarByteChunkSVForwardIndexReader(indexBuffer,
DataType.STRING);
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
rawIndexReader
- .createContext()) {
+ ChunkReaderContext readerContext = rawIndexReader.createContext()) {
_recordReader.rewind();
for (int row = 0; row < NUM_ROWS; row++) {
GenericRow expectedRow = _recordReader.next();
@@ -193,9 +191,8 @@ public class RawIndexCreatorTest {
throws Exception {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(column);
try (FixedByteChunkSVForwardIndexReader rawIndexReader = new
FixedByteChunkSVForwardIndexReader(
- indexBuffer,
- dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext
readerContext = rawIndexReader
- .createContext()) {
+ indexBuffer, dataType);
+ ChunkReaderContext readerContext = rawIndexReader.createContext()) {
_recordReader.rewind();
for (int row = 0; row < NUM_ROWS; row++) {
GenericRow expectedRow = _recordReader.next();
@@ -216,7 +213,7 @@ public class RawIndexCreatorTest {
try (VarByteChunkMVForwardIndexReader rawIndexReader = new
VarByteChunkMVForwardIndexReader(
indexBuffer,
DataType.STRING);
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
rawIndexReader
+ ChunkReaderContext readerContext = rawIndexReader
.createContext()) {
_recordReader.rewind();
int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
@@ -250,7 +247,7 @@ public class RawIndexCreatorTest {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN);
try (VarByteChunkMVForwardIndexReader rawIndexReader = new
VarByteChunkMVForwardIndexReader(
indexBuffer, DataType.BYTES);
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
rawIndexReader
+ ChunkReaderContext readerContext = rawIndexReader
.createContext()) {
_recordReader.rewind();
int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
@@ -373,8 +370,8 @@ public class RawIndexCreatorTest {
* @param docId Document id
* @return Value read from index
*/
- private Object readValueFromIndex(FixedByteChunkSVForwardIndexReader
rawIndexReader,
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext, int
docId) {
+ private Object readValueFromIndex(FixedByteChunkSVForwardIndexReader
rawIndexReader, ChunkReaderContext readerContext,
+ int docId) {
switch (rawIndexReader.getValueType()) {
case INT:
return rawIndexReader.getInt(docId, readerContext);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
index 9afe093..0e2386b 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
@@ -20,16 +20,20 @@ package
org.apache.pinot.segment.local.segment.index.forward;
import java.io.File;
import java.net.URL;
+import java.util.Arrays;
import java.util.Random;
+import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -49,47 +53,16 @@ public class FixedByteChunkSVForwardIndexTest {
private static final String TEST_FILE = System.getProperty("java.io.tmpdir")
+ File.separator + "FixedByteSVRTest";
private static final Random RANDOM = new Random();
- @Test
- public void testWithCompression()
- throws Exception {
- ChunkCompressionType compressionType = ChunkCompressionType.SNAPPY;
- testInt(compressionType);
- testLong(compressionType);
- testFloat(compressionType);
- testDouble(compressionType);
- }
-
- @Test
- public void testWithoutCompression()
- throws Exception {
- ChunkCompressionType compressionType = ChunkCompressionType.PASS_THROUGH;
- testInt(compressionType);
- testLong(compressionType);
- testFloat(compressionType);
- testDouble(compressionType);
- }
-
- @Test
- public void testWithZstandardCompression()
- throws Exception {
- ChunkCompressionType compressionType = ChunkCompressionType.ZSTANDARD;
- testInt(compressionType);
- testLong(compressionType);
- testFloat(compressionType);
- testDouble(compressionType);
- }
-
- @Test
- public void testWithLZ4Compression()
- throws Exception {
- ChunkCompressionType compressionType = ChunkCompressionType.LZ4;
- testInt(compressionType);
- testLong(compressionType);
- testFloat(compressionType);
- testDouble(compressionType);
+ @DataProvider(name = "combinations")
+ public static Object[][] combinations() {
+ return Arrays.stream(ChunkCompressionType.values())
+ .flatMap(chunkCompressionType -> IntStream.of(2, 3, 4)
+ .mapToObj(version -> new Object[]{chunkCompressionType, version}))
+ .toArray(Object[][]::new);
}
- public void testInt(ChunkCompressionType compressionType)
+ @Test(dataProvider = "combinations")
+ public void testInt(ChunkCompressionType compressionType, int version)
throws Exception {
int[] expected = new int[NUM_VALUES];
for (int i = 0; i < NUM_VALUES; i++) {
@@ -103,24 +76,28 @@ public class FixedByteChunkSVForwardIndexTest {
// test both formats (4-byte chunk offsets and 8-byte chunk offsets)
try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Integer.BYTES,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Integer.BYTES, version);
FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Integer.BYTES,
- BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+ outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Integer.BYTES, version)) {
for (int value : expected) {
fourByteOffsetWriter.putInt(value);
eightByteOffsetWriter.putInt(value);
}
}
- try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new
FixedByteChunkSVForwardIndexReader(
- PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.INT);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
fourByteOffsetReaderContext = fourByteOffsetReader
+ try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version
>= 4
+ ? new FixedBytePower2ChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.INT)
+ : new FixedByteChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.INT);
+ ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
.createContext();
- FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new
FixedByteChunkSVForwardIndexReader(
- PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.INT);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
eightByteOffsetReaderContext = eightByteOffsetReader
+ ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version
>= 4
+ ? new FixedBytePower2ChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.INT)
+ : new FixedByteChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.INT);
+ ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
.createContext()) {
for (int i = 0; i < NUM_VALUES; i++) {
Assert.assertEquals(fourByteOffsetReader.getInt(i,
fourByteOffsetReaderContext), expected[i]);
@@ -132,7 +109,8 @@ public class FixedByteChunkSVForwardIndexTest {
FileUtils.deleteQuietly(outFileEightByte);
}
- public void testLong(ChunkCompressionType compressionType)
+ @Test(dataProvider = "combinations")
+ public void testLong(ChunkCompressionType compressionType, int version)
throws Exception {
long[] expected = new long[NUM_VALUES];
for (int i = 0; i < NUM_VALUES; i++) {
@@ -146,24 +124,28 @@ public class FixedByteChunkSVForwardIndexTest {
// test both formats (4-byte chunk offsets and 8-byte chunk offsets)
try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Long.BYTES,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Long.BYTES, version);
FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Long.BYTES,
- BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+ outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Long.BYTES, version)) {
for (long value : expected) {
fourByteOffsetWriter.putLong(value);
eightByteOffsetWriter.putLong(value);
}
}
- try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new
FixedByteChunkSVForwardIndexReader(
- PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.LONG);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
fourByteOffsetReaderContext = fourByteOffsetReader
+ try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version
>= 4
+ ? new FixedBytePower2ChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.LONG)
+ : new FixedByteChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.LONG);
+ ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
.createContext();
- FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new
FixedByteChunkSVForwardIndexReader(
- PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.LONG);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
eightByteOffsetReaderContext = eightByteOffsetReader
+ ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version
>= 4
+ ? new FixedBytePower2ChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.LONG)
+ : new FixedByteChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.LONG);
+ ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
.createContext()) {
for (int i = 0; i < NUM_VALUES; i++) {
Assert.assertEquals(fourByteOffsetReader.getLong(i,
fourByteOffsetReaderContext), expected[i]);
@@ -175,7 +157,8 @@ public class FixedByteChunkSVForwardIndexTest {
FileUtils.deleteQuietly(outFileEightByte);
}
- public void testFloat(ChunkCompressionType compressionType)
+ @Test(dataProvider = "combinations")
+ public void testFloat(ChunkCompressionType compressionType, int version)
throws Exception {
float[] expected = new float[NUM_VALUES];
for (int i = 0; i < NUM_VALUES; i++) {
@@ -189,24 +172,28 @@ public class FixedByteChunkSVForwardIndexTest {
// test both formats (4-byte chunk offsets and 8-byte chunk offsets)
try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Float.BYTES,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Float.BYTES, version);
FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Float.BYTES,
- BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+ outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Float.BYTES, version)) {
for (float value : expected) {
fourByteOffsetWriter.putFloat(value);
eightByteOffsetWriter.putFloat(value);
}
}
- try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new
FixedByteChunkSVForwardIndexReader(
- PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.FLOAT);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
fourByteOffsetReaderContext = fourByteOffsetReader
+ try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version
>= 4
+ ? new FixedBytePower2ChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.FLOAT)
+ : new FixedByteChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.FLOAT);
+ ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
.createContext();
- FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new
FixedByteChunkSVForwardIndexReader(
- PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.FLOAT);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
eightByteOffsetReaderContext = eightByteOffsetReader
+ ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version
>= 4
+ ? new
FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
+ DataType.FLOAT)
+ : new FixedByteChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.FLOAT);
+ ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
.createContext()) {
for (int i = 0; i < NUM_VALUES; i++) {
Assert.assertEquals(fourByteOffsetReader.getFloat(i,
fourByteOffsetReaderContext), expected[i]);
@@ -218,7 +205,8 @@ public class FixedByteChunkSVForwardIndexTest {
FileUtils.deleteQuietly(outFileEightByte);
}
- public void testDouble(ChunkCompressionType compressionType)
+ @Test(dataProvider = "combinations")
+ public void testDouble(ChunkCompressionType compressionType, int version)
throws Exception {
double[] expected = new double[NUM_VALUES];
for (int i = 0; i < NUM_VALUES; i++) {
@@ -232,24 +220,28 @@ public class FixedByteChunkSVForwardIndexTest {
// test both formats (4-byte chunk offsets and 8-byte chunk offsets)
try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Double.BYTES,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Double.BYTES, version);
FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Double.BYTES,
- BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+ outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Double.BYTES, version)) {
for (double value : expected) {
fourByteOffsetWriter.putDouble(value);
eightByteOffsetWriter.putDouble(value);
}
}
- try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new
FixedByteChunkSVForwardIndexReader(
- PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.DOUBLE);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
fourByteOffsetReaderContext = fourByteOffsetReader
+ try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version
>= 4
+ ? new FixedBytePower2ChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.DOUBLE)
+ : new FixedByteChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.DOUBLE);
+ ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
.createContext();
- FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new
FixedByteChunkSVForwardIndexReader(
- PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.DOUBLE);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
eightByteOffsetReaderContext = eightByteOffsetReader
+ ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version
>= 4
+ ? new FixedBytePower2ChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.DOUBLE)
+ : new FixedByteChunkSVForwardIndexReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.DOUBLE);
+ ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
.createContext()) {
for (int i = 0; i < NUM_VALUES; i++) {
Assert.assertEquals(fourByteOffsetReader.getDouble(i,
fourByteOffsetReaderContext), expected[i]);
@@ -290,7 +282,7 @@ public class FixedByteChunkSVForwardIndexTest {
File file = new File(resource.getFile());
try (FixedByteChunkSVForwardIndexReader reader = new
FixedByteChunkSVForwardIndexReader(
PinotDataBuffer.mapReadOnlyBigEndianFile(file), DataType.DOUBLE);
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
reader.createContext()) {
+ ChunkReaderContext readerContext = reader.createContext()) {
for (int i = 0; i < numDocs; i++) {
double actual = reader.getDouble(i, readerContext);
Assert.assertEquals(actual, i + startValue);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
index 3f7f1c0..f6b8f46 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
@@ -27,7 +27,7 @@ import org.apache.commons.lang.RandomStringUtils;
import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -117,11 +117,11 @@ public class VarByteChunkSVForwardIndexTest {
try (VarByteChunkSVForwardIndexReader fourByteOffsetReader = new
VarByteChunkSVForwardIndexReader(
PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.STRING);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
fourByteOffsetReaderContext = fourByteOffsetReader
+ ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
.createContext();
VarByteChunkSVForwardIndexReader eightByteOffsetReader = new
VarByteChunkSVForwardIndexReader(
PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.STRING);
- BaseChunkSVForwardIndexReader.ChunkReaderContext
eightByteOffsetReaderContext = eightByteOffsetReader
+ ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
.createContext()) {
for (int i = 0; i < NUM_ENTRIES; i++) {
Assert.assertEquals(fourByteOffsetReader.getString(i,
fourByteOffsetReaderContext), expected[i]);
@@ -164,7 +164,7 @@ public class VarByteChunkSVForwardIndexTest {
File file = new File(resource.getFile());
try (VarByteChunkSVForwardIndexReader reader = new
VarByteChunkSVForwardIndexReader(
PinotDataBuffer.mapReadOnlyBigEndianFile(file), DataType.STRING);
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
reader.createContext()) {
+ ChunkReaderContext readerContext = reader.createContext()) {
for (int i = 0; i < numDocs; i++) {
String actual = reader.getString(i, readerContext);
Assert.assertEquals(actual, data[i % data.length]);
@@ -237,7 +237,7 @@ public class VarByteChunkSVForwardIndexTest {
PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(outFile);
try (VarByteChunkSVForwardIndexReader reader = new
VarByteChunkSVForwardIndexReader(buffer, DataType.STRING);
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
reader.createContext()) {
+ ChunkReaderContext readerContext = reader.createContext()) {
for (int i = 0; i < numDocs; i++) {
Assert.assertEquals(reader.getString(i, readerContext), expected[i]);
}
@@ -257,7 +257,7 @@ public class VarByteChunkSVForwardIndexTest {
}
try (VarByteChunkSVForwardIndexReader reader = new
VarByteChunkSVForwardIndexReader(buffer, DataType.STRING);
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
reader.createContext()) {
+ ChunkReaderContext readerContext = reader.createContext()) {
for (int i = 0; i < numDocs; i++) {
Assert.assertEquals(reader.getString(i, readerContext), expected[i]);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]