This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new aa5d65fccf Cleanup some reader/writer logic for raw forward index
(#11669)
aa5d65fccf is described below
commit aa5d65fccf87623970d2c5bdae8019d5d09c6ae7
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Sep 25 14:39:11 2023 -0700
Cleanup some reader/writer logic for raw forward index (#11669)
---
.../BenchmarkFixedByteSVForwardIndexReader.java | 11 +-
.../pinot/perf/BenchmarkRawForwardIndexReader.java | 8 +-
.../pinot/perf/BenchmarkRawForwardIndexWriter.java | 8 +-
...riter.java => BaseChunkForwardIndexWriter.java} | 79 +++---
....java => FixedByteChunkForwardIndexWriter.java} | 34 +--
...er.java => VarByteChunkForwardIndexWriter.java} | 35 +--
....java => VarByteChunkForwardIndexWriterV4.java} | 45 +++-
.../creator/impl/SegmentColumnarIndexCreator.java | 40 +--
.../fwd/MultiValueFixedByteRawIndexCreator.java | 68 ++---
.../impl/fwd/MultiValueVarByteRawIndexCreator.java | 29 +-
.../fwd/SingleValueFixedByteRawIndexCreator.java | 12 +-
.../fwd/SingleValueVarByteRawIndexCreator.java | 19 +-
.../index/forward/ForwardIndexCreatorFactory.java | 98 +++----
.../index/forward/ForwardIndexReaderFactory.java | 39 +--
.../segment/index/loader/ForwardIndexHandler.java | 1 +
.../ColumnMinMaxValueGenerator.java | 293 +++++++++------------
.../forward/BaseChunkForwardIndexReader.java | 5 +-
.../FixedByteChunkMVForwardIndexReader.java | 17 +-
.../FixedByteChunkSVForwardIndexReader.java | 4 +-
.../FixedBytePower2ChunkSVForwardIndexReader.java | 4 +-
.../forward/VarByteChunkMVForwardIndexReader.java | 24 +-
.../forward/VarByteChunkSVForwardIndexReader.java | 6 +-
.../VarByteChunkSVForwardIndexReaderV4.java | 54 ++--
.../startree/v2/store/StarTreeLoaderUtils.java | 13 +-
.../impl/VarByteChunkSVForwardIndexWriterTest.java | 70 +++--
.../segment/index/creator/RawIndexCreatorTest.java | 10 +-
.../segment/index/creator/VarByteChunkV4Test.java | 10 +-
.../forward/FixedByteChunkSVForwardIndexTest.java | 39 ++-
.../index/forward/ForwardIndexTypeTest.java | 10 +-
.../forward/VarByteChunkSVForwardIndexTest.java | 29 +-
.../converter/DictionaryToRawIndexConverter.java | 4 +-
31 files changed, 516 insertions(+), 602 deletions(-)
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
index 2c2444345c..27d1c90f98 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
@@ -69,18 +69,17 @@ public class BenchmarkFixedByteSVForwardIndexReader {
File pow2CompressedIndexFile = new File(INDEX_DIR,
UUID.randomUUID().toString());
_doubleBuffer = new double[_blockSize];
_longBuffer = new long[_blockSize];
- try (FixedByteChunkSVForwardIndexWriter writer = new
FixedByteChunkSVForwardIndexWriter(compressedIndexFile,
+ try (FixedByteChunkForwardIndexWriter writer = new
FixedByteChunkForwardIndexWriter(compressedIndexFile,
ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES,
3);
- FixedByteChunkSVForwardIndexWriter passthroughWriter = new
FixedByteChunkSVForwardIndexWriter(
- uncompressedIndexFile,
+ FixedByteChunkForwardIndexWriter passThroughWriter = new
FixedByteChunkForwardIndexWriter(uncompressedIndexFile,
ChunkCompressionType.PASS_THROUGH, _numBlocks * _blockSize, 1000,
Long.BYTES, 3);
- FixedByteChunkSVForwardIndexWriter pow2Writer = new
FixedByteChunkSVForwardIndexWriter(pow2CompressedIndexFile,
+ FixedByteChunkForwardIndexWriter pow2Writer = new
FixedByteChunkForwardIndexWriter(pow2CompressedIndexFile,
ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000,
Long.BYTES, 4)) {
for (int i = 0; i < _numBlocks * _blockSize; i++) {
long next = ThreadLocalRandom.current().nextLong();
writer.putLong(next);
pow2Writer.putLong(next);
- passthroughWriter.putLong(next);
+ passThroughWriter.putLong(next);
}
}
_compressedReader = new
FixedByteChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(compressedIndexFile),
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
index ae223e7bb9..6da32f2906 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
@@ -25,8 +25,8 @@ import java.util.SplittableRandom;
import java.util.UUID;
import java.util.function.LongSupplier;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
@@ -137,7 +137,7 @@ public class BenchmarkRawForwardIndexReader {
throws IOException {
super.setup();
_file = new File(TARGET_DIR, UUID.randomUUID().toString());
- try (VarByteChunkSVForwardIndexWriterV4 writer = new
VarByteChunkSVForwardIndexWriterV4(_file,
+ try (VarByteChunkForwardIndexWriterV4 writer = new
VarByteChunkForwardIndexWriterV4(_file,
_chunkCompressionType, _maxChunkSize)) {
for (int i = 0; i < _records; i++) {
writer.putBytes(_bytes[i]);
@@ -163,7 +163,7 @@ public class BenchmarkRawForwardIndexReader {
throws IOException {
super.setup();
_file = new File(TARGET_DIR, UUID.randomUUID().toString());
- try (VarByteChunkSVForwardIndexWriter writer = new
VarByteChunkSVForwardIndexWriter(_file, _chunkCompressionType,
+ try (VarByteChunkForwardIndexWriter writer = new
VarByteChunkForwardIndexWriter(_file, _chunkCompressionType,
_records, _maxChunkSize / _maxLength, _maxLength, 3)) {
for (int i = 0; i < _records; i++) {
writer.putBytes(_bytes[i]);
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexWriter.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexWriter.java
index 86d92e22d0..f2938e949d 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexWriter.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexWriter.java
@@ -25,8 +25,8 @@ import java.util.SplittableRandom;
import java.util.UUID;
import java.util.function.LongSupplier;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -138,7 +138,7 @@ public class BenchmarkRawForwardIndexWriter {
public void writeV4(BytesCounter counter)
throws IOException {
try (
- VarByteChunkSVForwardIndexWriterV4 writer = new
VarByteChunkSVForwardIndexWriterV4(_file, _chunkCompressionType,
+ VarByteChunkForwardIndexWriterV4 writer = new
VarByteChunkForwardIndexWriterV4(_file, _chunkCompressionType,
_maxChunkSize)) {
for (int i = 0; i < _records; i++) {
writer.putBytes(_bytes[i]);
@@ -151,7 +151,7 @@ public class BenchmarkRawForwardIndexWriter {
@BenchmarkMode(Mode.SingleShotTime)
public void writeV3(BytesCounter counter)
throws IOException {
- try (VarByteChunkSVForwardIndexWriter writer = new
VarByteChunkSVForwardIndexWriter(_file, _chunkCompressionType,
+ try (VarByteChunkForwardIndexWriter writer = new
VarByteChunkForwardIndexWriter(_file, _chunkCompressionType,
_records, _maxChunkSize / _maxLength, _maxLength, 3)) {
for (int i = 0; i < _records; i++) {
writer.putBytes(_bytes[i]);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
similarity index 76%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
index d0c41b0763..70d8f38706 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkForwardIndexWriter.java
@@ -28,22 +28,38 @@ import java.nio.channels.FileChannel;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
-import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Base implementation for chunk-based single-value raw
(non-dictionary-encoded) forward index writer.
+ * Base implementation for chunk-based raw (non-dictionary-encoded) forward
index writer where each chunk contains fixed
+ * number of docs.
+ *
+ * <p>The layout of the file is as follows:
+ * <ul>
+ * <li>Header Section
+ * <ul>
+ * <li>File format version (int)</li>
+ * <li>Total number of chunks (int)</li>
+ * <li>Number of docs per chunk (int)</li>
+ * <li>Size of entry in bytes (int)</li>
+ * <li>Total number of docs (int)</li>
+ * <li>Compression type enum value (int)</li>
+ * <li>Start offset of data header (int)</li>
+ * <li>Data header (start offsets for all chunks)
+ * <ul>
+ * <li>For version 2, offset is stored as int</li>
+ * <li>For version 3 onwards, offset is stored as long</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * </li>
+ * <li>Individual Chunks</li>
+ * </ul>
*/
-public abstract class BaseChunkSVForwardIndexWriter implements Closeable {
- // TODO: Remove this before release 0.5.0
- public static final int DEFAULT_VERSION =
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION;
- public static final int CURRENT_VERSION = 3;
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(BaseChunkSVForwardIndexWriter.class);
- private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2 =
Integer.BYTES;
- private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3 = Long.BYTES;
+public abstract class BaseChunkForwardIndexWriter implements Closeable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BaseChunkForwardIndexWriter.class);
protected final FileChannel _dataFile;
protected ByteBuffer _header;
@@ -69,15 +85,15 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
* @param fixed if the data type is fixed width (required for version
validation)
* @throws IOException if the file isn't found or can't be mapped
*/
- protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
+ protected BaseChunkForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
int numDocsPerChunk, long chunkSize, int sizeOfEntry, int version,
boolean fixed)
throws IOException {
- Preconditions.checkArgument(version == DEFAULT_VERSION || version ==
CURRENT_VERSION
- || (fixed && version == 4));
- Preconditions.checkArgument(chunkSize <= Integer.MAX_VALUE, "chunk size
limited to 2GB");
+ Preconditions.checkArgument(version == 2 || version == 3 || (fixed &&
version == 4),
+ "Illegal version: %s for %s bytes values", version, fixed ? "fixed" :
"variable");
+ Preconditions.checkArgument(chunkSize <= Integer.MAX_VALUE, "Chunk size
limited to 2GB");
_chunkSize = (int) chunkSize;
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
- _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
+ _headerEntryChunkOffsetSize = version == 2 ? Integer.BYTES : Long.BYTES;
_dataOffset = writeHeader(compressionType, totalDocs, numDocsPerChunk,
sizeOfEntry, version);
_chunkBuffer = ByteBuffer.allocateDirect(_chunkSize);
int maxCompressedChunkSize =
_chunkCompressor.maxCompressedSize(_chunkSize); // may exceed original chunk
size
@@ -85,19 +101,6 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
_dataFile = new RandomAccessFile(file, "rw").getChannel();
}
- public static int getHeaderEntryChunkOffsetSize(int version) {
- switch (version) {
- case 1:
- case 2:
- return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2;
- case 3:
- case 4:
- return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3;
- default:
- throw new IllegalStateException("Invalid version: " + version);
- }
- }
-
@Override
public void close()
throws IOException {
@@ -143,19 +146,17 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
_header.putInt(sizeOfEntry);
offset += Integer.BYTES;
- if (version > 1) {
- // Write total number of docs.
- _header.putInt(totalDocs);
- offset += Integer.BYTES;
+ // Write total number of docs.
+ _header.putInt(totalDocs);
+ offset += Integer.BYTES;
- // Write the compressor type
- _header.putInt(compressionType.getValue());
- offset += Integer.BYTES;
+ // Write the compressor type
+ _header.putInt(compressionType.getValue());
+ offset += Integer.BYTES;
- // Start of chunk offsets.
- int dataHeaderStart = offset + Integer.BYTES;
- _header.putInt(dataHeaderStart);
- }
+ // Start of chunk offsets.
+ int dataHeaderStart = offset + Integer.BYTES;
+ _header.putInt(dataHeaderStart);
return headerSize;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriter.java
similarity index 70%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriter.java
index 7b942b111b..11a250791b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkForwardIndexWriter.java
@@ -26,34 +26,11 @@ import
org.apache.pinot.segment.spi.compression.ChunkCompressionType;
/**
- * Class to write out fixed length bytes into a single column.
- * Client responsible to ensure that they call the correct set method that
- * matches the sizeOfEntry. Avoiding checks here, as they can be expensive
- * when called for each row.
- *
- * The layout of the file is as follows:
- * <p> Header Section: </p>
- * <ul>
- * <li> Integer: File format version. </li>
- * <li> Integer: Total number of chunks. </li>
- * <li> Integer: Number of docs per chunk. </li>
- * <li> Integer: Length of entry (in bytes). </li>
- * <li> Integer: Total number of docs (version 2 onwards). </li>
- * <li> Integer: Compression type enum value (version 2 onwards). </li>
- * <li> Integer: Start offset of data header (version 2 onwards). </li>
- * <li> Integer array: Integer offsets for all chunks in the data (upto
version 2),
- * Long array: Long offsets for all chunks in the data (version 3 onwards)
</li>
- * </ul>
- *
- * <p> Individual Chunks: </p>
- * <ul>
- * <li> Data bytes. </li>
- * </ul>
- *
- * Only sequential writes are supported.
+ * Chunk-based raw (non-dictionary-encoded) forward index writer where each
chunk contains fixed number of docs, and
+ * each entry has fixed number of bytes.
*/
@NotThreadSafe
-public class FixedByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexWriter {
+public class FixedByteChunkForwardIndexWriter extends
BaseChunkForwardIndexWriter {
private int _chunkDataOffset;
/**
@@ -68,12 +45,11 @@ public class FixedByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexW
* @throws FileNotFoundException Throws {@link FileNotFoundException} if the
specified file is not found.
* @throws IOException Throws {@link IOException} if there are any errors
mapping the underlying ByteBuffer.
*/
- public FixedByteChunkSVForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
+ public FixedByteChunkForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
int numDocsPerChunk, int sizeOfEntry, int writerVersion)
throws IOException {
super(file, compressionType, totalDocs,
normalizeDocsPerChunk(writerVersion, numDocsPerChunk),
- (sizeOfEntry * normalizeDocsPerChunk(writerVersion, numDocsPerChunk)),
sizeOfEntry,
- writerVersion, true);
+ (sizeOfEntry * normalizeDocsPerChunk(writerVersion, numDocsPerChunk)),
sizeOfEntry, writerVersion, true);
_chunkDataOffset = 0;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
similarity index 82%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
index bc55c3482d..b6daaf7fe7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
@@ -30,33 +30,20 @@ import static java.nio.charset.StandardCharsets.UTF_8;
/**
- * Class to write out variable length bytes into a single column.
+ * Chunk-based raw (non-dictionary-encoded) forward index writer where each
chunk contains fixed number of docs, and
+ * the entries are variable length.
*
- * The layout of the file is as follows:
- * <p> Header Section: </p>
+ * <p>The layout of each chunk is as follows:
* <ul>
- * <li> Integer: File format version. </li>
- * <li> Integer: Total number of chunks. </li>
- * <li> Integer: Number of docs per chunk. </li>
- * <li> Integer: Length of longest entry (in bytes). </li>
- * <li> Integer: Total number of docs (version 2 onwards). </li>
- * <li> Integer: Compression type enum value (version 2 onwards). </li>
- * <li> Integer: Start offset of data header (version 2 onwards). </li>
- * <li> Integer array: Integer offsets for all chunks in the data (upto
version 2),
- * Long array: Long offsets for all chunks in the data (version 3 onwards)
</li>
+ * <li>
+ * Header Section: start offsets (stored as int) of the entry within the
data section. For partial chunks, offset
+ * values are 0 for missing entries.
+ * </li>
+ * <li>Data Section</li>
* </ul>
- *
- * <p> Individual Chunks: </p>
- * <ul>
- * <li> Integer offsets to start position of rows: For partial chunks,
offset values are 0 for missing rows. </li>
- * <li> Data bytes. </li>
- * </ul>
- *
- * Only sequential writes are supported.
*/
@NotThreadSafe
-public class VarByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexWriter implements VarByteChunkWriter {
-
+public class VarByteChunkForwardIndexWriter extends
BaseChunkForwardIndexWriter implements VarByteChunkWriter {
public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
private final int _chunkHeaderSize;
@@ -75,8 +62,8 @@ public class VarByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexWri
* @throws FileNotFoundException Throws {@link FileNotFoundException} if the
specified file is
* not found.
*/
- public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType
compressionType,
- int totalDocs, int numDocsPerChunk, int lengthOfLongestEntry, int
writerVersion)
+ public VarByteChunkForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
+ int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion)
throws IOException {
super(file, compressionType, totalDocs, numDocsPerChunk,
numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + (long)
lengthOfLongestEntry),
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
similarity index 87%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
index 060266ef44..d70ed2dcbc 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
@@ -39,16 +39,43 @@ import org.slf4j.LoggerFactory;
/**
- * Class to write out variable length bytes into a single column.
+ * Chunk-based raw (non-dictionary-encoded) forward index writer where each
chunk contains variable number of docs, and
+ * the entries are variable length.
*
- *
- * Only sequential writes are supported.
+ * <p>The layout of the file is as follows:
+ * <ul>
+ * <li>Header Section
+ * <ul>
+ * <li>File format version (int)</li>
+ * <li>Target decompressed chunk size (int)</li>
+ * <li>Compression type enum value (int)</li>
+ * <li>Start offset of chunk data (int)</li>
+ * <li>Data header (for each chunk)
+ * <ul>
+ * <li>First docId in the chunk (int), where MSB is used to mark huge
chunk</li>
+ * <li>Start offset of the chunk (unsigned int)</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * </li>
+ * <li>Individual Chunks
+ * <ul>
+ * <li>Regular chunk
+ * <ul>
+ * <li>Header Section: start offsets (stored as int) of the entry within
the data section</li>
+ * <li>Data Section</li>
+ * </ul>
+ * </li>
+ * <li>Huge chunk: contains one single value</li>
+ * </ul>
+ * </li>
+ * </ul>
*/
@NotThreadSafe
-public class VarByteChunkSVForwardIndexWriterV4 implements VarByteChunkWriter {
-
+public class VarByteChunkForwardIndexWriterV4 implements VarByteChunkWriter {
public static final int VERSION = 4;
- private static final Logger LOGGER =
LoggerFactory.getLogger(VarByteChunkSVForwardIndexWriterV4.class);
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(VarByteChunkForwardIndexWriterV4.class);
private static final String DATA_BUFFER_SUFFIX = ".buf";
private final File _dataBuffer;
@@ -63,15 +90,15 @@ public class VarByteChunkSVForwardIndexWriterV4 implements
VarByteChunkWriter {
private int _metadataSize = 0;
private long _chunkOffset = 0;
- public VarByteChunkSVForwardIndexWriterV4(File file, ChunkCompressionType
compressionType, int chunkSize)
+ public VarByteChunkForwardIndexWriterV4(File file, ChunkCompressionType
compressionType, int chunkSize)
throws IOException {
_dataBuffer = new File(file.getName() + DATA_BUFFER_SUFFIX);
_output = new RandomAccessFile(file, "rw");
_dataChannel = new RandomAccessFile(_dataBuffer, "rw").getChannel();
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType,
true);
_chunkBuffer =
ByteBuffer.allocateDirect(chunkSize).order(ByteOrder.LITTLE_ENDIAN);
- _compressionBuffer =
ByteBuffer.allocateDirect(_chunkCompressor.maxCompressedSize(chunkSize))
- .order(ByteOrder.LITTLE_ENDIAN);
+ _compressionBuffer =
+
ByteBuffer.allocateDirect(_chunkCompressor.maxCompressedSize(chunkSize)).order(ByteOrder.LITTLE_ENDIAN);
// reserve space for numDocs
_chunkBuffer.position(Integer.BYTES);
writeHeader(_chunkCompressor.compressionType(), chunkSize);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index c275aec4a5..5ec40796d6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -78,6 +78,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*;
import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*;
+
/**
* Segment creator which writes data in a columnar form.
*/
@@ -149,6 +150,7 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx =
StandardIndexes.forward();
boolean forwardIndexDisabled =
!originalConfig.getConfig(forwardIdx).isEnabled();
+ //@formatter:off
IndexCreationContext.Common context = IndexCreationContext.builder()
.withIndexDir(_indexDir)
.withDictionary(dictEnabledColumn)
@@ -161,6 +163,7 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
.withForwardIndexDisabled(forwardIndexDisabled)
.withTextCommitOnClose(true)
.build();
+ //@formatter:on
FieldIndexConfigs config = adaptConfig(columnName, originalConfig,
columnIndexCreationInfo, segmentCreationSpec);
@@ -175,8 +178,8 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
LOGGER.info("Creating dictionary index in column {}.{} even when it
is disabled in config",
segmentCreationSpec.getTableName(), columnName);
}
- SegmentDictionaryCreator creator = new
DictionaryIndexPlugin().getIndexType()
- .createIndexCreator(context, dictConfig);
+ SegmentDictionaryCreator creator =
+ new
DictionaryIndexPlugin().getIndexType().createIndexCreator(context, dictConfig);
try {
creator.build(context.getSortedUniqueElementsArray());
@@ -229,9 +232,9 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
// Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
ForwardIndexConfig fwdConfig = config.getConfig(StandardIndexes.forward());
if (!fwdConfig.isEnabled() && columnIndexCreationInfo.isSorted()) {
- builder.add(StandardIndexes.forward(), new
ForwardIndexConfig.Builder(fwdConfig)
- .withLegacyProperties(segmentCreationSpec.getColumnProperties(),
columnName)
- .build());
+ builder.add(StandardIndexes.forward(),
+ new
ForwardIndexConfig.Builder(fwdConfig).withLegacyProperties(segmentCreationSpec.getColumnProperties(),
+ columnName).build());
}
// Initialize inverted index creator; skip creating inverted index if
sorted
if (columnIndexCreationInfo.isSorted()) {
@@ -289,8 +292,8 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
FieldIndexConfigs fieldIndexConfigs =
config.getIndexConfigsByColName().get(column);
if
(DictionaryIndexType.ignoreDictionaryOverride(config.isOptimizeDictionary(),
- config.isOptimizeDictionaryForMetrics(),
config.getNoDictionarySizeRatioThreshold(), spec,
- fieldIndexConfigs, info.getDistinctValueCount(),
info.getTotalNumberOfEntries())) {
+ config.isOptimizeDictionaryForMetrics(),
config.getNoDictionarySizeRatioThreshold(), spec, fieldIndexConfigs,
+ info.getDistinctValueCount(), info.getTotalNumberOfEntries())) {
// Ignore overrides and pick from config
createDictionary = info.isCreateDictionary();
}
@@ -560,7 +563,7 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
Object min = columnIndexCreationInfo.getMin();
Object max = columnIndexCreationInfo.getMax();
if (min != null && max != null) {
- addColumnMinMaxValueInfo(properties, column, min.toString(),
max.toString(), fieldSpec.getDataType());
+ addColumnMinMaxValueInfo(properties, column, min.toString(),
max.toString(), dataType.getStoredType());
}
}
@@ -574,19 +577,18 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
}
public static void addColumnMinMaxValueInfo(PropertiesConfiguration
properties, String column, String minValue,
- String maxValue, DataType dataType) {
- properties.setProperty(getKeyFor(column, MIN_VALUE),
getValidPropertyValue(minValue, false, dataType));
- properties.setProperty(getKeyFor(column, MAX_VALUE),
getValidPropertyValue(maxValue, true, dataType));
+ String maxValue, DataType storedType) {
+ properties.setProperty(getKeyFor(column, MIN_VALUE),
getValidPropertyValue(minValue, false, storedType));
+ properties.setProperty(getKeyFor(column, MAX_VALUE),
getValidPropertyValue(maxValue, true, storedType));
}
/**
* Helper method to get the valid value for setting min/max.
*/
- private static String getValidPropertyValue(String value, boolean isMax,
DataType dataType) {
- String valueWithinLengthLimit = getValueWithinLengthLimit(value, isMax,
dataType);
- return dataType.getStoredType() == DataType.STRING
- ?
CommonsConfigurationUtils.replaceSpecialCharacterInPropertyValue(valueWithinLengthLimit)
- : valueWithinLengthLimit;
+ private static String getValidPropertyValue(String value, boolean isMax,
DataType storedType) {
+ String valueWithinLengthLimit = getValueWithinLengthLimit(value, isMax,
storedType);
+ return storedType == DataType.STRING ?
CommonsConfigurationUtils.replaceSpecialCharacterInPropertyValue(
+ valueWithinLengthLimit) : valueWithinLengthLimit;
}
/**
@@ -594,12 +596,12 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
* returns a truncated version of the string with maintaining min or max
value.
*/
@VisibleForTesting
- static String getValueWithinLengthLimit(String value, boolean isMax,
DataType dataType) {
+ static String getValueWithinLengthLimit(String value, boolean isMax,
DataType storedType) {
int length = value.length();
if (length <= METADATA_PROPERTY_LENGTH_LIMIT) {
return value;
}
- switch (dataType.getStoredType()) {
+ switch (storedType) {
case STRING:
if (isMax) {
int trimIndexValue = METADATA_PROPERTY_LENGTH_LIMIT - 1;
@@ -635,7 +637,7 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
return
BytesUtils.toHexString(Arrays.copyOf(BytesUtils.toBytes(value),
(METADATA_PROPERTY_LENGTH_LIMIT / 2)));
}
default:
- throw new IllegalStateException("Unsupported data type for property
value length reduction: " + dataType);
+ throw new IllegalStateException("Unsupported stored type for property
value length reduction: " + storedType);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index 82f5bff1ca..a1ba59f3b2 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -21,23 +21,21 @@ package
org.apache.pinot.segment.local.segment.creator.impl.fwd;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.spi.data.FieldSpec.DataType;
/**
- * Forward index creator for raw (non-dictionary-encoded) single-value column
of variable length
- * data type (STRING,
- * BYTES).
+ * Raw (non-dictionary-encoded) forward index creator for multi-value column
of fixed length data type (INT, LONG,
+ * FLOAT, DOUBLE).
*/
public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator
{
-
private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
@@ -56,9 +54,8 @@ public class MultiValueFixedByteRawIndexCreator implements
ForwardIndexCreator {
public MultiValueFixedByteRawIndexCreator(File baseIndexDir,
ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType, int maxNumberOfMultiValueElements)
throws IOException {
- this(baseIndexDir, compressionType, column, totalDocs, valueType,
- maxNumberOfMultiValueElements, false,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ this(baseIndexDir, compressionType, column, totalDocs, valueType,
maxNumberOfMultiValueElements, false,
+ ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
}
/**
@@ -72,20 +69,19 @@ public class MultiValueFixedByteRawIndexCreator implements
ForwardIndexCreator {
* @param deriveNumDocsPerChunk true if writer should auto-derive the number
of rows per chunk
* @param writerVersion writer format version
*/
- public MultiValueFixedByteRawIndexCreator(File baseIndexDir,
ChunkCompressionType compressionType,
- String column, int totalDocs, DataType valueType, int
maxNumberOfMultiValueElements,
- boolean deriveNumDocsPerChunk, int writerVersion)
+ public MultiValueFixedByteRawIndexCreator(File baseIndexDir,
ChunkCompressionType compressionType, String column,
+ int totalDocs, DataType valueType, int maxNumberOfMultiValueElements,
boolean deriveNumDocsPerChunk,
+ int writerVersion)
throws IOException {
File file = new File(baseIndexDir, column +
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
// Store the length followed by the values
int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements *
valueType.getStoredType().size());
- int numDocsPerChunk =
- deriveNumDocsPerChunk ? Math.max(TARGET_MAX_CHUNK_SIZE /
(totalMaxLength
- +
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) :
DEFAULT_NUM_DOCS_PER_CHUNK;
- _indexWriter = writerVersion < VarByteChunkSVForwardIndexWriterV4.VERSION
- ? new VarByteChunkSVForwardIndexWriter(file, compressionType,
totalDocs, numDocsPerChunk, totalMaxLength,
- writerVersion)
- : new VarByteChunkSVForwardIndexWriterV4(file, compressionType,
TARGET_MAX_CHUNK_SIZE);
+ int numDocsPerChunk = deriveNumDocsPerChunk ? Math.max(
+ TARGET_MAX_CHUNK_SIZE / (totalMaxLength +
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1)
+ : DEFAULT_NUM_DOCS_PER_CHUNK;
+ _indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ?
new VarByteChunkForwardIndexWriter(file,
+ compressionType, totalDocs, numDocsPerChunk, totalMaxLength,
writerVersion)
+ : new VarByteChunkForwardIndexWriterV4(file, compressionType,
TARGET_MAX_CHUNK_SIZE);
_valueType = valueType;
}
@@ -105,60 +101,52 @@ public class MultiValueFixedByteRawIndexCreator
implements ForwardIndexCreator {
}
@Override
- public void putIntMV(final int[] values) {
-
- byte[] bytes = new byte[Integer.BYTES
- + values.length * Integer.BYTES]; //numValues, bytes required to store
the content
+ public void putIntMV(int[] values) {
+ byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
//write the content of each element
- for (final int value : values) {
+ for (int value : values) {
byteBuffer.putInt(value);
}
_indexWriter.putBytes(bytes);
}
@Override
- public void putLongMV(final long[] values) {
-
- byte[] bytes = new byte[Integer.BYTES
- + values.length * Long.BYTES]; //numValues, bytes required to store
the content
+ public void putLongMV(long[] values) {
+ byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
//write the content of each element
- for (final long value : values) {
+ for (long value : values) {
byteBuffer.putLong(value);
}
_indexWriter.putBytes(bytes);
}
@Override
- public void putFloatMV(final float[] values) {
-
- byte[] bytes = new byte[Integer.BYTES
- + values.length * Float.BYTES]; //numValues, bytes required to store
the content
+ public void putFloatMV(float[] values) {
+ byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
//write the content of each element
- for (final float value : values) {
+ for (float value : values) {
byteBuffer.putFloat(value);
}
_indexWriter.putBytes(bytes);
}
@Override
- public void putDoubleMV(final double[] values) {
-
- byte[] bytes = new byte[Integer.BYTES
- + values.length * Double.BYTES]; //numValues, bytes required to store
the content
+ public void putDoubleMV(double[] values) {
+ byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
//write the content of each element
- for (final double value : values) {
+ for (double value : values) {
byteBuffer.putDouble(value);
}
_indexWriter.putBytes(bytes);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 84d471d50e..85dba85ab9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -21,24 +21,23 @@ package
org.apache.pinot.segment.local.segment.creator.impl.fwd;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.spi.data.FieldSpec.DataType;
/**
- * Forward index creator for raw (non-dictionary-encoded) single-value column
of variable length
- * data type (STRING,
+ * Raw (non-dictionary-encoded) forward index creator for multi-value column
of variable length data type (STRING,
* BYTES).
*/
public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
-
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
- private final VarByteChunkSVForwardIndexWriter _indexWriter;
+ private final VarByteChunkForwardIndexWriter _indexWriter;
private final DataType _valueType;
/**
@@ -55,8 +54,8 @@ public class MultiValueVarByteRawIndexCreator implements
ForwardIndexCreator {
public MultiValueVarByteRawIndexCreator(File baseIndexDir,
ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType, int maxRowLengthInBytes, int
maxNumberOfElements)
throws IOException {
- this(baseIndexDir, compressionType, column, totalDocs, valueType,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes,
maxNumberOfElements);
+ this(baseIndexDir, compressionType, column, totalDocs, valueType,
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
+ maxRowLengthInBytes, maxNumberOfElements);
}
/**
@@ -77,13 +76,17 @@ public class MultiValueVarByteRawIndexCreator implements
ForwardIndexCreator {
//we will prepend the actual content with numElements and length array
containing length of each element
int totalMaxLength = getTotalRowStorageBytes(maxNumberOfElements,
maxRowLengthInBytes);
- File file = new File(baseIndexDir,
- column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ File file = new File(baseIndexDir, column +
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
int numDocsPerChunk = Math.max(
- TARGET_MAX_CHUNK_SIZE / (totalMaxLength +
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
+ TARGET_MAX_CHUNK_SIZE / (totalMaxLength +
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
1);
- _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType,
totalDocs, numDocsPerChunk,
- totalMaxLength, writerVersion);
+ // TODO: Support V4 MV reader
+ // Currently fall back to V2 for backward compatible
+ if (writerVersion == VarByteChunkForwardIndexWriterV4.VERSION) {
+ writerVersion = 2;
+ }
+ _indexWriter = new VarByteChunkForwardIndexWriter(file, compressionType,
totalDocs, numDocsPerChunk, totalMaxLength,
+ writerVersion);
_valueType = valueType;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java
index 81391d274c..c16aa57a5c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueFixedByteRawIndexCreator.java
@@ -20,22 +20,22 @@ package
org.apache.pinot.segment.local.segment.creator.impl.fwd;
import java.io.File;
import java.io.IOException;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.spi.data.FieldSpec.DataType;
/**
- * Forward index creator for raw (non-dictionary-encoded) single-value column
of fixed length data type (INT, LONG,
+ * Raw (non-dictionary-encoded) forward index creator for single-value column
of fixed length data type (INT, LONG,
* FLOAT, DOUBLE).
*/
public class SingleValueFixedByteRawIndexCreator implements
ForwardIndexCreator {
private static final int NUM_DOCS_PER_CHUNK = 1000; // TODO: Auto-derive
this based on metadata.
- private final FixedByteChunkSVForwardIndexWriter _indexWriter;
+ private final FixedByteChunkForwardIndexWriter _indexWriter;
private final DataType _valueType;
/**
@@ -51,7 +51,7 @@ public class SingleValueFixedByteRawIndexCreator implements
ForwardIndexCreator
public SingleValueFixedByteRawIndexCreator(File baseIndexDir,
ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType)
throws IOException {
- this(baseIndexDir, compressionType, column, totalDocs, valueType,
BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ this(baseIndexDir, compressionType, column, totalDocs, valueType,
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
}
/**
@@ -70,7 +70,7 @@ public class SingleValueFixedByteRawIndexCreator implements
ForwardIndexCreator
throws IOException {
File file = new File(baseIndexDir, column +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
_indexWriter =
- new FixedByteChunkSVForwardIndexWriter(file, compressionType,
totalDocs, NUM_DOCS_PER_CHUNK, valueType.size(),
+ new FixedByteChunkForwardIndexWriter(file, compressionType, totalDocs,
NUM_DOCS_PER_CHUNK, valueType.size(),
writerVersion);
_valueType = valueType;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
index f01550ac40..9e3658802f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
@@ -22,18 +22,18 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.spi.data.FieldSpec.DataType;
/**
- * Forward index creator for raw (non-dictionary-encoded) single-value column
of variable length data type (BIG_DECIMAL,
+ * Raw (non-dictionary-encoded) forward index creator for single-value column
of variable length data type (BIG_DECIMAL,
* STRING, BYTES).
*/
public class SingleValueVarByteRawIndexCreator implements ForwardIndexCreator {
@@ -57,7 +57,7 @@ public class SingleValueVarByteRawIndexCreator implements
ForwardIndexCreator {
int totalDocs, DataType valueType, int maxLength)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType,
maxLength, false,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
}
/**
@@ -77,16 +77,15 @@ public class SingleValueVarByteRawIndexCreator implements
ForwardIndexCreator {
throws IOException {
File file = new File(baseIndexDir, column +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
int numDocsPerChunk = deriveNumDocsPerChunk ?
getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
- _indexWriter = writerVersion < VarByteChunkSVForwardIndexWriterV4.VERSION
- ? new VarByteChunkSVForwardIndexWriter(file, compressionType,
totalDocs, numDocsPerChunk, maxLength,
- writerVersion)
- : new VarByteChunkSVForwardIndexWriterV4(file, compressionType,
TARGET_MAX_CHUNK_SIZE);
+ _indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ?
new VarByteChunkForwardIndexWriter(file,
+ compressionType, totalDocs, numDocsPerChunk, maxLength, writerVersion)
+ : new VarByteChunkForwardIndexWriterV4(file, compressionType,
TARGET_MAX_CHUNK_SIZE);
_valueType = valueType;
}
@VisibleForTesting
public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
- int overheadPerEntry = lengthOfLongestEntry +
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ int overheadPerEntry = lengthOfLongestEntry +
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
index 9cb35aa36f..a57158b44f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
@@ -33,6 +33,7 @@ import
org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
public class ForwardIndexCreatorFactory {
@@ -41,107 +42,90 @@ public class ForwardIndexCreatorFactory {
public static ForwardIndexCreator createIndexCreator(IndexCreationContext
context, ForwardIndexConfig indexConfig)
throws Exception {
- String colName = context.getFieldSpec().getName();
+ File indexDir = context.getIndexDir();
+ FieldSpec fieldSpec = context.getFieldSpec();
+ String columnName = fieldSpec.getName();
+ int numTotalDocs = context.getTotalDocs();
- if (!context.hasDictionary()) {
+ if (context.hasDictionary()) {
+ // Dictionary enabled columns
+ int cardinality = context.getCardinality();
+ if (fieldSpec.isSingleValueField()) {
+ if (context.isSorted()) {
+ return new SingleValueSortedForwardIndexCreator(indexDir,
columnName, cardinality);
+ } else {
+ return new SingleValueUnsortedForwardIndexCreator(indexDir,
columnName, cardinality, numTotalDocs);
+ }
+ } else {
+ return new MultiValueUnsortedForwardIndexCreator(indexDir, columnName,
cardinality, numTotalDocs,
+ context.getTotalNumberOfEntries());
+ }
+ } else {
+ // Dictionary disabled columns
+ DataType storedType = fieldSpec.getDataType().getStoredType();
ChunkCompressionType chunkCompressionType =
indexConfig.getChunkCompressionType();
if (chunkCompressionType == null) {
- chunkCompressionType =
ForwardIndexType.getDefaultCompressionType(context.getFieldSpec().getFieldType());
+ chunkCompressionType =
ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
}
-
- // Dictionary disabled columns
boolean deriveNumDocsPerChunk = indexConfig.isDeriveNumDocsPerChunk();
int writerVersion = indexConfig.getRawIndexWriterVersion();
- if (context.getFieldSpec().isSingleValueField()) {
- return getRawIndexCreatorForSVColumn(context.getIndexDir(),
chunkCompressionType, colName,
- context.getFieldSpec().getDataType().getStoredType(),
- context.getTotalDocs(), context.getLengthOfLongestEntry(),
deriveNumDocsPerChunk, writerVersion);
+ if (fieldSpec.isSingleValueField()) {
+ return getRawIndexCreatorForSVColumn(indexDir, chunkCompressionType,
columnName, storedType, numTotalDocs,
+ context.getLengthOfLongestEntry(), deriveNumDocsPerChunk,
writerVersion);
} else {
- return getRawIndexCreatorForMVColumn(context.getIndexDir(),
chunkCompressionType, colName,
- context.getFieldSpec().getDataType().getStoredType(),
- context.getTotalDocs(),
context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk,
writerVersion,
+ return getRawIndexCreatorForMVColumn(indexDir, chunkCompressionType,
columnName, storedType, numTotalDocs,
+ context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk,
writerVersion,
context.getMaxRowLengthInBytes());
}
- } else {
- // Dictionary enabled columns
- if (context.getFieldSpec().isSingleValueField()) {
- if (context.isSorted()) {
- return new
SingleValueSortedForwardIndexCreator(context.getIndexDir(), colName,
- context.getCardinality());
- } else {
- return new
SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), colName,
- context.getCardinality(), context.getTotalDocs());
- }
- } else {
- return new
MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), colName,
- context.getCardinality(), context.getTotalDocs(),
context.getTotalNumberOfEntries());
- }
}
}
/**
* Helper method to build the raw index creator for the column.
* Assumes that column to be indexed is single valued.
- *
- * @param file Output index file
- * @param column Column name
- * @param totalDocs Total number of documents to index
- * @param lengthOfLongestEntry Length of longest entry
- * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive
the number of rows per chunk
- * @param writerVersion version to use for the raw index writer
- * @return raw index creator
*/
- public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file,
ChunkCompressionType compressionType,
- String column, FieldSpec.DataType dataType, int totalDocs, int
lengthOfLongestEntry,
- boolean deriveNumDocsPerChunk, int writerVersion)
+ public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File
indexDir, ChunkCompressionType compressionType,
+ String column, DataType storedType, int numTotalDocs, int
lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
+ int writerVersion)
throws IOException {
- switch (dataType.getStoredType()) {
+ switch (storedType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
- return new SingleValueFixedByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
+ return new SingleValueFixedByteRawIndexCreator(indexDir,
compressionType, column, numTotalDocs, storedType,
writerVersion);
case BIG_DECIMAL:
case STRING:
case BYTES:
- return new SingleValueVarByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
+ return new SingleValueVarByteRawIndexCreator(indexDir,
compressionType, column, numTotalDocs, storedType,
lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
default:
- throw new UnsupportedOperationException("Data type not supported for
raw indexing: " + dataType);
+ throw new IllegalStateException("Unsupported stored type: " +
storedType);
}
}
/**
* Helper method to build the raw index creator for the column.
* Assumes that column to be indexed is multi-valued.
- *
- * @param file Output index file
- * @param column Column name
- * @param totalDocs Total number of documents to index
- * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive
the number of rows
- * per chunk
- * @param writerVersion version to use for the raw index writer
- * @param maxRowLengthInBytes the length of the longest row in bytes
- * @return raw index creator
*/
- public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file,
ChunkCompressionType compressionType,
- String column, FieldSpec.DataType dataType, final int totalDocs, int
maxNumberOfMultiValueElements,
+ public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File
indexDir, ChunkCompressionType compressionType,
+ String column, DataType storedType, int numTotalDocs, int
maxNumberOfMultiValueElements,
boolean deriveNumDocsPerChunk, int writerVersion, int
maxRowLengthInBytes)
throws IOException {
- switch (dataType.getStoredType()) {
+ switch (storedType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
- return new MultiValueFixedByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
+ return new MultiValueFixedByteRawIndexCreator(indexDir,
compressionType, column, numTotalDocs, storedType,
maxNumberOfMultiValueElements, deriveNumDocsPerChunk,
writerVersion);
case STRING:
case BYTES:
- return new MultiValueVarByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType, writerVersion,
- maxRowLengthInBytes, maxNumberOfMultiValueElements);
+ return new MultiValueVarByteRawIndexCreator(indexDir, compressionType,
column, numTotalDocs, storedType,
+ writerVersion, maxRowLengthInBytes, maxNumberOfMultiValueElements);
default:
- throw new UnsupportedOperationException("Data type not supported for
raw indexing: " + dataType);
+ throw new IllegalStateException("Unsupported stored type: " +
storedType);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
index 283f569f40..ecaaf875a5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
@@ -19,7 +19,7 @@
package org.apache.pinot.segment.local.segment.index.forward;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
@@ -37,11 +37,10 @@ import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
-class ForwardIndexReaderFactory extends
IndexReaderFactory.Default<ForwardIndexConfig, ForwardIndexReader> {
-
+public class ForwardIndexReaderFactory extends
IndexReaderFactory.Default<ForwardIndexConfig, ForwardIndexReader> {
public static final ForwardIndexReaderFactory INSTANCE = new
ForwardIndexReaderFactory();
@Override
@@ -69,22 +68,26 @@ class ForwardIndexReaderFactory extends
IndexReaderFactory.Default<ForwardIndexC
metadata.getBitsPerElement());
}
} else {
- FieldSpec.DataType storedType = metadata.getDataType().getStoredType();
- if (metadata.isSingleValue()) {
- int version = dataBuffer.getInt(0);
- if (storedType.isFixedWidth()) {
- return version >= FixedBytePower2ChunkSVForwardIndexReader.VERSION
- ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer,
storedType)
- : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
- }
- if (version >= VarByteChunkSVForwardIndexWriterV4.VERSION) {
- return new VarByteChunkSVForwardIndexReaderV4(dataBuffer,
storedType);
- }
- return new VarByteChunkSVForwardIndexReader(dataBuffer, storedType);
+ return createRawIndexReader(dataBuffer,
metadata.getDataType().getStoredType(), metadata.isSingleValue());
+ }
+ }
+
+ public static ForwardIndexReader createRawIndexReader(PinotDataBuffer
dataBuffer, DataType storedType,
+ boolean isSingleValue) {
+ int version = dataBuffer.getInt(0);
+ if (isSingleValue) {
+ if (storedType.isFixedWidth()) {
+ return version == FixedBytePower2ChunkSVForwardIndexReader.VERSION
+ ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer,
storedType)
+ : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
} else {
- return storedType.isFixedWidth() ? new
FixedByteChunkMVForwardIndexReader(dataBuffer, storedType)
- : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
+ return version == VarByteChunkForwardIndexWriterV4.VERSION ? new
VarByteChunkSVForwardIndexReaderV4(dataBuffer,
+ storedType) : new VarByteChunkSVForwardIndexReader(dataBuffer,
storedType);
}
+ } else {
+ // TODO: Support V4 MV reader
+ return storedType.isFixedWidth() ? new
FixedByteChunkMVForwardIndexReader(dataBuffer, storedType)
+ : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 91b0d46b0a..5b5ad75de6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -331,6 +331,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
}
} else if (existingNoDictColumns.contains(column) && !newIsDict) {
// Both existing and new column is RAW forward index encoded. Check if
compression needs to be changed.
+ // TODO: Also check if raw index version needs to be changed
if (shouldChangeCompressionType(column, segmentReader)) {
columnOperationsMap.put(column,
Collections.singletonList(Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index 406fc5c42b..6d3cc83c10 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -24,20 +24,18 @@ import java.util.List;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
+import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
@@ -139,47 +137,48 @@ public class ColumnMinMaxValueGenerator {
return;
}
- DataType dataType = columnMetadata.getDataType().getStoredType();
+ DataType dataType = columnMetadata.getDataType();
+ DataType storedType = dataType.getStoredType();
if (columnMetadata.hasDictionary()) {
PinotDataBuffer dictionaryBuffer =
_segmentWriter.getIndexFor(columnName, StandardIndexes.dictionary());
int length = columnMetadata.getCardinality();
- switch (dataType) {
+ switch (storedType) {
case INT:
try (IntDictionary intDictionary = new
IntDictionary(dictionaryBuffer, length)) {
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- intDictionary.getStringValue(0),
intDictionary.getStringValue(length - 1), dataType);
+ intDictionary.getStringValue(0),
intDictionary.getStringValue(length - 1), storedType);
}
break;
case LONG:
try (LongDictionary longDictionary = new
LongDictionary(dictionaryBuffer, length)) {
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- longDictionary.getStringValue(0),
longDictionary.getStringValue(length - 1), dataType);
+ longDictionary.getStringValue(0),
longDictionary.getStringValue(length - 1), storedType);
}
break;
case FLOAT:
try (FloatDictionary floatDictionary = new
FloatDictionary(dictionaryBuffer, length)) {
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- floatDictionary.getStringValue(0),
floatDictionary.getStringValue(length - 1), dataType);
+ floatDictionary.getStringValue(0),
floatDictionary.getStringValue(length - 1), storedType);
}
break;
case DOUBLE:
try (DoubleDictionary doubleDictionary = new
DoubleDictionary(dictionaryBuffer, length)) {
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- doubleDictionary.getStringValue(0),
doubleDictionary.getStringValue(length - 1), dataType);
+ doubleDictionary.getStringValue(0),
doubleDictionary.getStringValue(length - 1), storedType);
}
break;
case STRING:
try (StringDictionary stringDictionary = new
StringDictionary(dictionaryBuffer, length,
columnMetadata.getColumnMaxLength())) {
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- stringDictionary.getStringValue(0),
stringDictionary.getStringValue(length - 1), dataType);
+ stringDictionary.getStringValue(0),
stringDictionary.getStringValue(length - 1), storedType);
}
break;
case BYTES:
try (BytesDictionary bytesDictionary = new
BytesDictionary(dictionaryBuffer, length,
columnMetadata.getColumnMaxLength())) {
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- bytesDictionary.getStringValue(0),
bytesDictionary.getStringValue(length - 1), dataType);
+ bytesDictionary.getStringValue(0),
bytesDictionary.getStringValue(length - 1), storedType);
}
break;
default:
@@ -188,129 +187,117 @@ public class ColumnMinMaxValueGenerator {
} else {
// setting min/max for non-dictionary columns.
int numDocs = columnMetadata.getTotalDocs();
- boolean isSingleValueField =
_segmentMetadata.getSchema().getFieldSpecFor(columnName).isSingleValueField();
- PinotDataBuffer forwardBuffer = _segmentWriter.getIndexFor(columnName,
StandardIndexes.forward());
- switch (dataType) {
- case INT: {
- int min = Integer.MAX_VALUE;
- int max = Integer.MIN_VALUE;
- if (isSingleValueField) {
- try (FixedByteChunkSVForwardIndexReader rawIndexReader = new
FixedByteChunkSVForwardIndexReader(
- forwardBuffer, DataType.INT); ChunkReaderContext readerContext
= rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- int value = rawIndexReader.getInt(docId, readerContext);
+ PinotDataBuffer rawIndexBuffer = _segmentWriter.getIndexFor(columnName,
StandardIndexes.forward());
+ boolean isSingleValue =
_segmentMetadata.getSchema().getFieldSpecFor(columnName).isSingleValueField();
+ try (
+ ForwardIndexReader rawIndexReader =
ForwardIndexReaderFactory.createRawIndexReader(rawIndexBuffer, storedType,
+ isSingleValue); ForwardIndexReaderContext readerContext =
rawIndexReader.createContext()) {
+ switch (storedType) {
+ case INT: {
+ int min = Integer.MAX_VALUE;
+ int max = Integer.MIN_VALUE;
+ if (isSingleValue) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ int value = rawIndexReader.getInt(docId, readerContext);
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ int[] values = rawIndexReader.getIntMV(docId, readerContext);
+ for (int value : values) {
min = Math.min(min, value);
max = Math.max(max, value);
}
+ }
}
- } else {
- try (FixedByteChunkMVForwardIndexReader rawIndexReader = new
FixedByteChunkMVForwardIndexReader(
- forwardBuffer, DataType.INT); ChunkReaderContext readerContext
= rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- int[] value = rawIndexReader.getIntMV(docId, readerContext);
- for (int i = 0; i < value.length; i++) {
- min = Math.min(min, value[i]);
- max = Math.max(max, value[i]);
- }
- }
- }
+
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName, String.valueOf(min),
+ String.valueOf(max), storedType);
+ break;
}
-
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- String.valueOf(min), String.valueOf(max), dataType);
- }
- break;
- case LONG: {
- long min = Long.MAX_VALUE;
- long max = Long.MIN_VALUE;
- if (isSingleValueField) {
- try (FixedByteChunkSVForwardIndexReader rawIndexReader = new
FixedByteChunkSVForwardIndexReader(
- forwardBuffer, DataType.LONG); ChunkReaderContext
readerContext = rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- long value = rawIndexReader.getLong(docId, readerContext);
+ case LONG: {
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ if (isSingleValue) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ long value = rawIndexReader.getLong(docId, readerContext);
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ long[] values = rawIndexReader.getLongMV(docId, readerContext);
+ for (long value : values) {
min = Math.min(min, value);
max = Math.max(max, value);
}
+ }
}
- } else {
- try (FixedByteChunkMVForwardIndexReader rawIndexReader = new
FixedByteChunkMVForwardIndexReader(
- forwardBuffer, DataType.LONG); ChunkReaderContext
readerContext = rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- long[] value = rawIndexReader.getLongMV(docId,
readerContext);
- for (int i = 0; i < value.length; i++) {
- min = Math.min(min, value[i]);
- max = Math.max(max, value[i]);
- }
- }
- }
+
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName, String.valueOf(min),
+ String.valueOf(max), storedType);
+ break;
}
-
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- String.valueOf(min), String.valueOf(max), dataType);
- }
- break;
- case FLOAT: {
- float min = Float.MAX_VALUE;
- float max = Float.MIN_VALUE;
- if (isSingleValueField) {
- try (FixedByteChunkSVForwardIndexReader rawIndexReader = new
FixedByteChunkSVForwardIndexReader(
- forwardBuffer, DataType.FLOAT); ChunkReaderContext
readerContext = rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- float value = rawIndexReader.getFloat(docId, readerContext);
+ case FLOAT: {
+ float min = Float.POSITIVE_INFINITY;
+ float max = Float.NEGATIVE_INFINITY;
+ if (isSingleValue) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ float value = rawIndexReader.getFloat(docId, readerContext);
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ float[] values = rawIndexReader.getFloatMV(docId,
readerContext);
+ for (float value : values) {
min = Math.min(min, value);
max = Math.max(max, value);
}
+ }
}
- } else {
- try (FixedByteChunkMVForwardIndexReader rawIndexReader = new
FixedByteChunkMVForwardIndexReader(
- forwardBuffer, DataType.FLOAT); ChunkReaderContext
readerContext = rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- float[] value = rawIndexReader.getFloatMV(docId,
readerContext);
- for (int i = 0; i < value.length; i++) {
- min = Math.min(min, value[i]);
- max = Math.max(max, value[i]);
- }
- }
- }
+
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName, String.valueOf(min),
+ String.valueOf(max), storedType);
+ break;
}
-
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- String.valueOf(min), String.valueOf(max), dataType);
- }
- break;
- case DOUBLE: {
- double min = Double.MAX_VALUE;
- double max = Double.MIN_VALUE;
- if (isSingleValueField) {
- try (FixedByteChunkSVForwardIndexReader rawIndexReader = new
FixedByteChunkSVForwardIndexReader(
- forwardBuffer, DataType.DOUBLE); ChunkReaderContext
readerContext = rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- double value = rawIndexReader.getDouble(docId,
readerContext);
+ case DOUBLE: {
+ double min = Double.POSITIVE_INFINITY;
+ double max = Double.NEGATIVE_INFINITY;
+ if (isSingleValue) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ double value = rawIndexReader.getDouble(docId, readerContext);
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ double[] values = rawIndexReader.getDoubleMV(docId,
readerContext);
+ for (double value : values) {
min = Math.min(min, value);
max = Math.max(max, value);
}
+ }
}
- } else {
- try (FixedByteChunkMVForwardIndexReader rawIndexReader = new
FixedByteChunkMVForwardIndexReader(
- forwardBuffer, DataType.DOUBLE); ChunkReaderContext
readerContext = rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- double[] value = rawIndexReader.getDoubleMV(docId,
readerContext);
- for (int i = 0; i < value.length; i++) {
- min = Math.min(min, value[i]);
- max = Math.max(max, value[i]);
- }
- }
- }
+
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName, String.valueOf(min),
+ String.valueOf(max), storedType);
+ break;
}
-
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- String.valueOf(min), String.valueOf(max), dataType);
- }
- break;
- case STRING: {
- String min = null;
- String max = null;
- if (isSingleValueField) {
- try (VarByteChunkSVForwardIndexReader rawIndexReader = new
VarByteChunkSVForwardIndexReader(forwardBuffer,
- DataType.STRING); ChunkReaderContext readerContext =
rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- String value = rawIndexReader.getString(docId,
readerContext);
+ case STRING: {
+ String min = null;
+ String max = null;
+ if (isSingleValue) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ String value = rawIndexReader.getString(docId, readerContext);
+ if (min == null || StringUtils.compare(min, value) > 0) {
+ min = value;
+ }
+ if (max == null || StringUtils.compare(max, value) < 0) {
+ max = value;
+ }
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ String[] values = rawIndexReader.getStringMV(docId,
readerContext);
+ for (String value : values) {
if (min == null || StringUtils.compare(min, value) > 0) {
min = value;
}
@@ -318,34 +305,28 @@ public class ColumnMinMaxValueGenerator {
max = value;
}
}
+ }
}
- } else {
- try (VarByteChunkMVForwardIndexReader rawIndexReader = new
VarByteChunkMVForwardIndexReader(forwardBuffer,
- DataType.STRING); ChunkReaderContext readerContext =
rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- String[] value = rawIndexReader.getStringMV(docId,
readerContext);
- for (int i = 0; i < value.length; i++) {
- if (min == null || StringUtils.compare(min, value[i]) > 0)
{
- min = value[i];
- }
- if (max == null || StringUtils.compare(max, value[i]) < 0)
{
- max = value[i];
- }
- }
- }
- }
- }
-
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName, min, max, dataType);
+
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName, min, max, storedType);
+ break;
}
- break;
- case BYTES: {
- byte[] min = null;
- byte[] max = null;
- if (isSingleValueField) {
- try (VarByteChunkSVForwardIndexReader rawIndexReader = new
VarByteChunkSVForwardIndexReader(forwardBuffer,
- DataType.BYTES); ChunkReaderContext readerContext =
rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- byte[] value = rawIndexReader.getBytes(docId, readerContext);
+ case BYTES: {
+ byte[] min = null;
+ byte[] max = null;
+ if (isSingleValue) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ byte[] value = rawIndexReader.getBytes(docId, readerContext);
+ if (min == null || ByteArray.compare(value, min) > 0) {
+ min = value;
+ }
+ if (max == null || ByteArray.compare(value, max) < 0) {
+ max = value;
+ }
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ byte[][] values = rawIndexReader.getBytesMV(docId,
readerContext);
+ for (byte[] value : values) {
if (min == null || ByteArray.compare(value, min) > 0) {
min = value;
}
@@ -353,29 +334,15 @@ public class ColumnMinMaxValueGenerator {
max = value;
}
}
+ }
}
- } else {
- try (VarByteChunkMVForwardIndexReader rawIndexReader = new
VarByteChunkMVForwardIndexReader(forwardBuffer,
- DataType.BYTES); ChunkReaderContext readerContext =
rawIndexReader.createContext()) {
- for (int docId = 0; docId < numDocs; docId++) {
- byte[][] value = rawIndexReader.getBytesMV(docId,
readerContext);
- for (int i = 0; i < value.length; i++) {
- if (min == null || ByteArray.compare(value[i], min) > 0) {
- min = value[i];
- }
- if (max == null || ByteArray.compare(value[i], max) < 0) {
- max = value[i];
- }
- }
- }
- }
- }
-
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
- BytesUtils.toHexString(min), BytesUtils.toHexString(max),
dataType);
+
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties,
columnName,
+ BytesUtils.toHexString(min), BytesUtils.toHexString(max),
storedType);
+ break;
}
- break;
- default:
- throw new IllegalStateException("Unsupported data type: " + dataType
+ " for column: " + columnName);
+ default:
+ throw new IllegalStateException("Unsupported data type: " +
dataType + " for column: " + columnName);
+ }
}
}
_minMaxValueAdded = true;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
index e37fb4a987..5e05346bbc 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
@@ -26,7 +26,6 @@ import java.nio.FloatBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -55,7 +54,7 @@ public abstract class BaseChunkForwardIndexReader implements
ForwardIndexReader<
protected final PinotDataBuffer _rawData;
protected final boolean _isSingleValue;
- public BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType
storedType, boolean isSingleValue) {
+ protected BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType
storedType, boolean isSingleValue) {
_dataBuffer = dataBuffer;
_storedType = storedType;
@@ -92,7 +91,7 @@ public abstract class BaseChunkForwardIndexReader implements
ForwardIndexReader<
_chunkDecompressor =
ChunkCompressorFactory.getDecompressor(_compressionType);
}
- _headerEntryChunkOffsetSize =
BaseChunkSVForwardIndexWriter.getHeaderEntryChunkOffsetSize(version);
+ _headerEntryChunkOffsetSize = version <= 2 ? Integer.BYTES : Long.BYTES;
// Slice out the header from the data buffer.
int dataHeaderLength = _numChunks * _headerEntryChunkOffsetSize;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
index acbec1a98d..cc44bb341b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
@@ -20,19 +20,18 @@ package
org.apache.pinot.segment.local.segment.index.readers.forward;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
/**
- * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader
for values of
- * fixed length data type (INT, LONG, FLOAT, DOUBLE).
- * <p>For data layout, please refer to the documentation for {@link
VarByteChunkSVForwardIndexWriter}
+ * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader
for values of fixed length data type (INT,
+ * LONG, FLOAT, DOUBLE).
+ * <p>For data layout, please refer to the documentation for {@link
VarByteChunkForwardIndexWriter}
*/
public final class FixedByteChunkMVForwardIndexReader extends
BaseChunkForwardIndexReader {
-
- private static final int ROW_OFFSET_SIZE =
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ private static final int ROW_OFFSET_SIZE =
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
private final int _maxChunkSize;
@@ -206,8 +205,7 @@ public final class FixedByteChunkMVForwardIndexReader
extends BaseChunkForwardIn
// Last row in the last chunk
return _dataBuffer.size();
} else {
- int valueEndOffsetInChunk = _dataBuffer
- .getInt(chunkStartOffset + (long) (chunkRowId + 1) *
ROW_OFFSET_SIZE);
+ int valueEndOffsetInChunk = _dataBuffer.getInt(chunkStartOffset +
(long) (chunkRowId + 1) * ROW_OFFSET_SIZE);
if (valueEndOffsetInChunk == 0) {
// Last row in the last chunk (chunk is incomplete, which stores 0
as the offset for the absent rows)
return _dataBuffer.size();
@@ -220,8 +218,7 @@ public final class FixedByteChunkMVForwardIndexReader
extends BaseChunkForwardIn
// Last row in the chunk
return getChunkPosition(chunkId + 1);
} else {
- return chunkStartOffset + _dataBuffer
- .getInt(chunkStartOffset + (long) (chunkRowId + 1) *
ROW_OFFSET_SIZE);
+ return chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long)
(chunkRowId + 1) * ROW_OFFSET_SIZE);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
index c394d0021a..4bda3f8090 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
@@ -20,7 +20,7 @@ package
org.apache.pinot.segment.local.segment.index.readers.forward;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
-import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
/**
* Chunk-based single-value raw (non-dictionary-encoded) forward index reader
for values of fixed length data type (INT,
* LONG, FLOAT, DOUBLE).
- * <p>For data layout, please refer to the documentation for {@link
FixedByteChunkSVForwardIndexWriter}
+ * <p>For data layout, please refer to the documentation for {@link
FixedByteChunkForwardIndexWriter}
*/
public final class FixedByteChunkSVForwardIndexReader extends
BaseChunkForwardIndexReader {
private final int _chunkSize;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
index 35c857219c..1fc567cfd4 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
@@ -20,7 +20,7 @@ package
org.apache.pinot.segment.local.segment.index.readers.forward;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
-import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
/**
* Chunk-based single-value raw (non-dictionary-encoded) forward index reader
for values of fixed length data type (INT,
* LONG, FLOAT, DOUBLE).
- * <p>For data layout, please refer to the documentation for {@link
FixedByteChunkSVForwardIndexWriter}
+ * <p>For data layout, please refer to the documentation for {@link
FixedByteChunkForwardIndexWriter}
*/
public final class FixedBytePower2ChunkSVForwardIndexReader extends
BaseChunkForwardIndexReader {
public static final int VERSION = 4;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
index c445312f6f..235132d44c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -21,20 +21,18 @@ package
org.apache.pinot.segment.local.segment.index.readers.forward;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+
/**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader
for values of
- * variable
- * length data type
+ * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader
for values of variable length data type
* (STRING, BYTES).
- * <p>For data layout, please refer to the documentation for {@link
VarByteChunkSVForwardIndexWriter}
+ * <p>For data layout, please refer to the documentation for {@link
VarByteChunkForwardIndexWriter}
*/
public final class VarByteChunkMVForwardIndexReader extends
BaseChunkForwardIndexReader {
-
- private static final int ROW_OFFSET_SIZE =
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ private static final int ROW_OFFSET_SIZE =
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
private final int _maxChunkSize;
@@ -54,8 +52,7 @@ public final class VarByteChunkMVForwardIndexReader extends
BaseChunkForwardInde
}
@Override
- public int getStringMV(final int docId, final String[] valueBuffer,
- final ChunkReaderContext context) {
+ public int getStringMV(final int docId, final String[] valueBuffer, final
ChunkReaderContext context) {
byte[] compressedBytes;
if (_isCompressed) {
compressedBytes = getBytesCompressed(docId, context);
@@ -100,8 +97,7 @@ public final class VarByteChunkMVForwardIndexReader extends
BaseChunkForwardInde
}
@Override
- public int getBytesMV(final int docId, final byte[][] valueBuffer,
- final ChunkReaderContext context) {
+ public int getBytesMV(final int docId, final byte[][] valueBuffer, final
ChunkReaderContext context) {
byte[] compressedBytes;
if (_isCompressed) {
compressedBytes = getBytesCompressed(docId, context);
@@ -229,8 +225,7 @@ public final class VarByteChunkMVForwardIndexReader extends
BaseChunkForwardInde
// Last row in the last chunk
return _dataBuffer.size();
} else {
- int valueEndOffsetInChunk = _dataBuffer
- .getInt(chunkStartOffset + (long) (chunkRowId + 1) *
ROW_OFFSET_SIZE);
+ int valueEndOffsetInChunk = _dataBuffer.getInt(chunkStartOffset +
(long) (chunkRowId + 1) * ROW_OFFSET_SIZE);
if (valueEndOffsetInChunk == 0) {
// Last row in the last chunk (chunk is incomplete, which stores 0
as the offset for the absent rows)
return _dataBuffer.size();
@@ -243,8 +238,7 @@ public final class VarByteChunkMVForwardIndexReader extends
BaseChunkForwardInde
// Last row in the chunk
return getChunkPosition(chunkId + 1);
} else {
- return chunkStartOffset + _dataBuffer
- .getInt(chunkStartOffset + (long) (chunkRowId + 1) *
ROW_OFFSET_SIZE);
+ return chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long)
(chunkRowId + 1) * ROW_OFFSET_SIZE);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
index 4a69f73463..8728dd53a8 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
@@ -21,7 +21,7 @@ package
org.apache.pinot.segment.local.segment.index.readers.forward;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -32,10 +32,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Chunk-based single-value raw (non-dictionary-encoded) forward index reader
for values of variable length data type
* (BIG_DECIMAL, STRING, BYTES).
- * <p>For data layout, please refer to the documentation for {@link
VarByteChunkSVForwardIndexWriter}
+ * <p>For data layout, please refer to the documentation for {@link
VarByteChunkForwardIndexWriter}
*/
public final class VarByteChunkSVForwardIndexReader extends
BaseChunkForwardIndexReader {
- private static final int ROW_OFFSET_SIZE =
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ private static final int ROW_OFFSET_SIZE =
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
private final int _maxChunkSize;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
index 919e737f36..c1d842b23c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
@@ -18,14 +18,14 @@
*/
package org.apache.pinot.segment.local.segment.index.readers.forward;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
-import javax.annotation.Nullable;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -38,11 +38,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader
for values of variable length data type
+ * (BIG_DECIMAL, STRING, BYTES).
+ * <p>For data layout, please refer to the documentation for {@link
VarByteChunkForwardIndexWriterV4}
+ */
public class VarByteChunkSVForwardIndexReaderV4
implements
ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
-
private static final int METADATA_ENTRY_SIZE = 8;
private final FieldSpec.DataType _storedType;
@@ -54,10 +57,8 @@ public class VarByteChunkSVForwardIndexReaderV4
private final PinotDataBuffer _chunks;
public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer,
FieldSpec.DataType storedType) {
- if (dataBuffer.getInt(0) < VarByteChunkSVForwardIndexWriterV4.VERSION) {
- throw new IllegalStateException("version " + dataBuffer.getInt(0) + " < "
- + VarByteChunkSVForwardIndexWriterV4.VERSION);
- }
+ int version = dataBuffer.getInt(0);
+ Preconditions.checkState(version ==
VarByteChunkForwardIndexWriterV4.VERSION, "Illegal index version: %s", version);
_storedType = storedType;
_targetDecompressedChunkSize = dataBuffer.getInt(4);
_chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8));
@@ -83,6 +84,20 @@ public class VarByteChunkSVForwardIndexReaderV4
return _storedType;
}
+ @Override
+ public ChunkCompressionType getCompressionType() {
+ // NOTE: Treat LZ4_LENGTH_PREFIXED as LZ4 because
VarByteChunkForwardIndexWriterV4 implicitly override it
+ return _chunkCompressionType == ChunkCompressionType.LZ4_LENGTH_PREFIXED ?
ChunkCompressionType.LZ4
+ : _chunkCompressionType;
+ }
+
+ @Override
+ public ReaderContext createContext() {
+ return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH ? new
UncompressedReaderContext(_chunks,
+ _metadata) : new CompressedReaderContext(_metadata, _chunks,
_chunkDecompressor, _chunkCompressionType,
+ _targetDecompressedChunkSize);
+ }
+
@Override
public BigDecimal getBigDecimal(int docId, ReaderContext context) {
return BigDecimalUtils.deserialize(context.getValue(docId));
@@ -98,15 +113,6 @@ public class VarByteChunkSVForwardIndexReaderV4
return context.getValue(docId);
}
- @Nullable
- @Override
- public ReaderContext createContext() {
- return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH
- ? new UncompressedReaderContext(_chunks, _metadata)
- : new CompressedReaderContext(_metadata, _chunks, _chunkDecompressor,
_chunkCompressionType,
- _targetDecompressedChunkSize);
- }
-
@Override
public void close()
throws IOException {
@@ -209,9 +215,8 @@ public class VarByteChunkSVForwardIndexReaderV4
protected byte[] readSmallUncompressedValue(int docId) {
int index = docId - _docIdOffset;
int offset = _chunk.getInt((index + 1) * Integer.BYTES);
- int nextOffset = index == _numDocsInCurrentChunk - 1
- ? _chunk.limit()
- : _chunk.getInt((index + 2) * Integer.BYTES);
+ int nextOffset =
+ index == _numDocsInCurrentChunk - 1 ? _chunk.limit() :
_chunk.getInt((index + 2) * Integer.BYTES);
byte[] bytes = new byte[nextOffset - offset];
_chunk.position(offset);
_chunk.get(bytes);
@@ -220,8 +225,7 @@ public class VarByteChunkSVForwardIndexReaderV4
}
@Override
- public void close()
- throws IOException {
+ public void close() {
}
}
@@ -257,8 +261,7 @@ public class VarByteChunkSVForwardIndexReaderV4
protected byte[] readSmallUncompressedValue(int docId) {
int index = docId - _docIdOffset;
int offset = _decompressedBuffer.getInt((index + 1) * Integer.BYTES);
- int nextOffset = index == _numDocsInCurrentChunk - 1
- ? _decompressedBuffer.limit()
+ int nextOffset = index == _numDocsInCurrentChunk - 1 ?
_decompressedBuffer.limit()
: _decompressedBuffer.getInt((index + 2) * Integer.BYTES);
byte[] bytes = new byte[nextOffset - offset];
_decompressedBuffer.position(offset);
@@ -295,8 +298,7 @@ public class VarByteChunkSVForwardIndexReaderV4
}
@Override
- public void close()
- throws IOException {
+ public void close() {
CleanerUtil.cleanQuietly(_decompressedBuffer);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
index 0c666703c0..2b09994c97 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
@@ -24,16 +24,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.startree.OffHeapStarTree;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.segment.spi.index.startree.StarTree;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
@@ -84,12 +83,8 @@ public class StarTreeLoaderUtils {
PinotDataBuffer forwardIndexDataBuffer =
indexReader.getIndexFor(metric, StandardIndexes.forward());
DataType dataType =
ValueAggregatorFactory.getAggregatedValueType(functionColumnPair.getFunctionType());
FieldSpec fieldSpec = new MetricFieldSpec(metric, dataType);
- BaseChunkForwardIndexReader forwardIndex;
- if (dataType == DataType.BYTES) {
- forwardIndex = new
VarByteChunkSVForwardIndexReader(forwardIndexDataBuffer, DataType.BYTES);
- } else {
- forwardIndex = new
FixedByteChunkSVForwardIndexReader(forwardIndexDataBuffer, dataType);
- }
+ ForwardIndexReader<?> forwardIndex =
+
ForwardIndexReaderFactory.createRawIndexReader(forwardIndexDataBuffer,
dataType.getStoredType(), true);
dataSourceMap.put(metric, new StarTreeDataSource(fieldSpec, numDocs,
forwardIndex, null));
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
index 48dad11495..e3d89157db 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
@@ -46,7 +46,6 @@ import static org.testng.Assert.assertEquals;
public class VarByteChunkSVForwardIndexWriterTest {
-
private static final File OUTPUT_DIR =
new File(FileUtils.getTempDirectory(),
VarByteChunkSVForwardIndexWriterTest.class.getSimpleName());
@@ -67,31 +66,26 @@ public class VarByteChunkSVForwardIndexWriterTest {
int[] numbersOfDocs = {10, 1000};
int[][] entryLengths = {{1, 1}, {0, 10}, {0, 100}, {100, 100}, {900,
1000}};
int[] versions = {2, 3};
- return Arrays.stream(ChunkCompressionType.values())
- .flatMap(chunkCompressionType ->
IntStream.of(versions).boxed().flatMap(
- version -> IntStream.of(numbersOfDocs).boxed()
- .flatMap(totalDocs ->
IntStream.of(numDocsPerChunks).boxed().flatMap(
- numDocsPerChunk -> Arrays.stream(entryLengths).map(
- lengths -> new Object[]{
- chunkCompressionType, totalDocs, numDocsPerChunk,
- lengths, version
- })))))
- .toArray(Object[][]::new);
+ return
Arrays.stream(ChunkCompressionType.values()).flatMap(chunkCompressionType ->
IntStream.of(versions).boxed()
+ .flatMap(version -> IntStream.of(numbersOfDocs).boxed().flatMap(
+ totalDocs -> IntStream.of(numDocsPerChunks).boxed()
+ .flatMap(numDocsPerChunk ->
Arrays.stream(entryLengths).map(lengths -> new Object[]{
+ chunkCompressionType, totalDocs, numDocsPerChunk, lengths,
version
+ }))))).toArray(Object[][]::new);
}
@Test(dataProvider = "params")
- public void testPutStrings(ChunkCompressionType compressionType, int
totalDocs, int numDocsPerChunk,
- int[] lengths, int version)
+ public void testPutStrings(ChunkCompressionType compressionType, int
totalDocs, int numDocsPerChunk, int[] lengths,
+ int version)
throws IOException {
String column = "testCol-" + UUID.randomUUID();
File file = new File(OUTPUT_DIR, column +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
List<String[]> arrays = generateStringArrays(totalDocs, lengths, 50);
- int maxEntryLengthInBytes =
- arrays.stream().mapToInt(array -> Integer.BYTES +
Arrays.stream(array).mapToInt(
- str -> Integer.BYTES +
str.getBytes(UTF_8).length).sum()).max().orElse(0);
- try (
- VarByteChunkSVForwardIndexWriter writer = new
VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
- numDocsPerChunk, maxEntryLengthInBytes, version)) {
+ int maxEntryLengthInBytes = arrays.stream().mapToInt(
+ array -> Integer.BYTES + Arrays.stream(array).mapToInt(
+ str -> Integer.BYTES +
str.getBytes(UTF_8).length).sum()).max().orElse(0);
+ try (VarByteChunkForwardIndexWriter writer = new
VarByteChunkForwardIndexWriter(file, compressionType, totalDocs,
+ numDocsPerChunk, maxEntryLengthInBytes, version)) {
for (String[] array : arrays) {
writer.putStrings(array);
}
@@ -116,18 +110,17 @@ public class VarByteChunkSVForwardIndexWriterTest {
}
@Test(dataProvider = "params")
- public void testPutBytes(ChunkCompressionType compressionType, int
totalDocs, int numDocsPerChunk,
- int[] lengths, int version)
+ public void testPutBytes(ChunkCompressionType compressionType, int
totalDocs, int numDocsPerChunk, int[] lengths,
+ int version)
throws IOException {
String column = "testCol-" + UUID.randomUUID();
File file = new File(OUTPUT_DIR, column +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
List<String[]> arrays = generateStringArrays(totalDocs, lengths, 50);
- int maxEntryLengthInBytes = arrays.stream()
- .mapToInt(array -> Integer.BYTES
- + Arrays.stream(array).mapToInt(str -> Integer.BYTES +
str.getBytes(UTF_8).length).sum())
- .max().orElse(0);
- try (VarByteChunkSVForwardIndexWriter writer = new
VarByteChunkSVForwardIndexWriter(file, compressionType,
- totalDocs, numDocsPerChunk, maxEntryLengthInBytes, version)) {
+ int maxEntryLengthInBytes = arrays.stream().mapToInt(
+ array -> Integer.BYTES + Arrays.stream(array).mapToInt(
+ str -> Integer.BYTES +
str.getBytes(UTF_8).length).sum()).max().orElse(0);
+ try (VarByteChunkForwardIndexWriter writer = new
VarByteChunkForwardIndexWriter(file, compressionType, totalDocs,
+ numDocsPerChunk, maxEntryLengthInBytes, version)) {
for (String[] array : arrays) {
writer.putByteArrays(Arrays.stream(array).map(str ->
str.getBytes(UTF_8)).toArray(byte[][]::new));
}
@@ -167,17 +160,16 @@ public class VarByteChunkSVForwardIndexWriterTest {
private static Iterator<String> generateStrings(int minLength, int
maxLength) {
SplittableRandom random = new SplittableRandom();
- return IntStream.generate(() -> random.nextInt(minLength, maxLength + 1))
- .mapToObj(length -> {
- char[] string = new char[length];
- Arrays.fill(string, 'b');
- if (string.length > 0) {
- string[0] = 'a';
- }
- if (string.length > 1) {
- string[string.length - 1] = 'c';
- }
- return new String(string);
- }).iterator();
+ return IntStream.generate(() -> random.nextInt(minLength, maxLength +
1)).mapToObj(length -> {
+ char[] string = new char[length];
+ Arrays.fill(string, 'b');
+ if (string.length > 0) {
+ string[0] = 'a';
+ }
+ if (string.length > 1) {
+ string[string.length - 1] = 'c';
+ }
+ return new String(string);
+ }).iterator();
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
index 391f5d03a8..9e9a593691 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
@@ -29,14 +29,16 @@ import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -63,7 +65,6 @@ import org.testng.annotations.Test;
* Class for testing Raw index creators.
*/
public class RawIndexCreatorTest {
-
private static final int NUM_ROWS = 10009;
private static final int MAX_STRING_LENGTH = 101;
@@ -168,8 +169,9 @@ public class RawIndexCreatorTest {
public void testStringRawIndexCreator()
throws Exception {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN);
- try (VarByteChunkSVForwardIndexReader rawIndexReader = new
VarByteChunkSVForwardIndexReader(indexBuffer,
- DataType.STRING); ChunkReaderContext readerContext =
rawIndexReader.createContext()) {
+ try (
+ ForwardIndexReader rawIndexReader =
ForwardIndexReaderFactory.createRawIndexReader(indexBuffer, DataType.STRING,
+ true); ForwardIndexReaderContext readerContext =
rawIndexReader.createContext()) {
_recordReader.rewind();
for (int row = 0; row < NUM_ROWS; row++) {
GenericRow expectedRow = _recordReader.next();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
index 857e29bdb0..ba59d057a7 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -89,7 +89,7 @@ public class VarByteChunkV4Test {
throws IOException {
_file = new File(TEST_DIR, "testStringSV");
testSV(compressionType, longestEntry, chunkSize,
FieldSpec.DataType.STRING, x -> x,
- VarByteChunkSVForwardIndexWriterV4::putString, (reader, context,
docId) -> reader.getString(docId, context));
+ VarByteChunkForwardIndexWriterV4::putString, (reader, context, docId)
-> reader.getString(docId, context));
}
@Test(dataProvider = "params")
@@ -97,16 +97,16 @@ public class VarByteChunkV4Test {
throws IOException {
_file = new File(TEST_DIR, "testBytesSV");
testSV(compressionType, longestEntry, chunkSize, FieldSpec.DataType.BYTES,
x -> x.getBytes(StandardCharsets.UTF_8),
- VarByteChunkSVForwardIndexWriterV4::putBytes, (reader, context, docId)
-> reader.getBytes(docId, context));
+ VarByteChunkForwardIndexWriterV4::putBytes, (reader, context, docId)
-> reader.getBytes(docId, context));
}
private <T> void testSV(ChunkCompressionType compressionType, int
longestEntry, int chunkSize,
FieldSpec.DataType dataType, Function<String, T> forwardMapper,
- BiConsumer<VarByteChunkSVForwardIndexWriterV4, T> write,
+ BiConsumer<VarByteChunkForwardIndexWriterV4, T> write,
Read<T> read)
throws IOException {
List<T> values = randomStrings(1000,
longestEntry).map(forwardMapper).collect(Collectors.toList());
- try (VarByteChunkSVForwardIndexWriterV4 writer = new
VarByteChunkSVForwardIndexWriterV4(_file, compressionType,
+ try (VarByteChunkForwardIndexWriterV4 writer = new
VarByteChunkForwardIndexWriterV4(_file, compressionType,
chunkSize)) {
for (T value : values) {
write.accept(writer, value);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
index 0e2386ba82..43d1e35ad0 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
import java.util.Random;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkForwardIndexWriter;
import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
@@ -38,14 +38,13 @@ import org.testng.annotations.Test;
/**
- * Unit test for {@link FixedByteChunkSVForwardIndexReader} and {@link
FixedByteChunkSVForwardIndexWriter} classes.
+ * Unit test for {@link FixedByteChunkSVForwardIndexReader} and {@link
FixedByteChunkForwardIndexWriter} classes.
*
- * This test writes {@link #NUM_VALUES} using {@link
FixedByteChunkSVForwardIndexWriter}. It then reads
+ * This test writes {@link #NUM_VALUES} using {@link
FixedByteChunkForwardIndexWriter}. It then reads
* the values using {@link FixedByteChunkSVForwardIndexReader}, and asserts
that what was written is the same as
* what was read in.
*
* Number of docs and docs per chunk are chosen to generate complete as well
partial chunks.
- *
*/
public class FixedByteChunkSVForwardIndexTest {
private static final int NUM_VALUES = 10009;
@@ -75,10 +74,10 @@ public class FixedByteChunkSVForwardIndexTest {
FileUtils.deleteQuietly(outFileEightByte);
// test both formats (4-byte chunk offsets and 8-byte chunk offsets)
- try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Integer.BYTES, version);
- FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Integer.BYTES, version)) {
+ try (FixedByteChunkForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkForwardIndexWriter(outFileFourByte,
+ compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Integer.BYTES,
version);
+ FixedByteChunkForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkForwardIndexWriter(outFileEightByte,
+ compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Integer.BYTES,
version)) {
for (int value : expected) {
fourByteOffsetWriter.putInt(value);
eightByteOffsetWriter.putInt(value);
@@ -123,10 +122,10 @@ public class FixedByteChunkSVForwardIndexTest {
FileUtils.deleteQuietly(outFileEightByte);
// test both formats (4-byte chunk offsets and 8-byte chunk offsets)
- try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Long.BYTES, version);
- FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Long.BYTES, version)) {
+ try (FixedByteChunkForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkForwardIndexWriter(outFileFourByte,
+ compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Long.BYTES, version);
+ FixedByteChunkForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkForwardIndexWriter(outFileEightByte,
+ compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Long.BYTES,
version)) {
for (long value : expected) {
fourByteOffsetWriter.putLong(value);
eightByteOffsetWriter.putLong(value);
@@ -171,10 +170,10 @@ public class FixedByteChunkSVForwardIndexTest {
FileUtils.deleteQuietly(outFileEightByte);
// test both formats (4-byte chunk offsets and 8-byte chunk offsets)
- try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Float.BYTES, version);
- FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Float.BYTES, version)) {
+ try (FixedByteChunkForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkForwardIndexWriter(outFileFourByte,
+ compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Float.BYTES, version);
+ FixedByteChunkForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkForwardIndexWriter(outFileEightByte,
+ compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Float.BYTES,
version)) {
for (float value : expected) {
fourByteOffsetWriter.putFloat(value);
eightByteOffsetWriter.putFloat(value);
@@ -219,10 +218,10 @@ public class FixedByteChunkSVForwardIndexTest {
FileUtils.deleteQuietly(outFileEightByte);
// test both formats (4-byte chunk offsets and 8-byte chunk offsets)
- try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Double.BYTES, version);
- FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkSVForwardIndexWriter(
- outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK,
Double.BYTES, version)) {
+ try (FixedByteChunkForwardIndexWriter fourByteOffsetWriter = new
FixedByteChunkForwardIndexWriter(outFileFourByte,
+ compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Double.BYTES,
version);
+ FixedByteChunkForwardIndexWriter eightByteOffsetWriter = new
FixedByteChunkForwardIndexWriter(outFileEightByte,
+ compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Double.BYTES,
version)) {
for (double value : expected) {
fourByteOffsetWriter.putDouble(value);
eightByteOffsetWriter.putDouble(value);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
index b686d69ba8..2210211d71 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
@@ -32,7 +32,9 @@ import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
public class ForwardIndexTypeTest {
@@ -152,7 +154,7 @@ public class ForwardIndexTypeTest {
new ForwardIndexConfig.Builder()
.withCompressionType(null)
.withDeriveNumDocsPerChunk(false)
- .withRawIndexWriterVersion(2)
+
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
.build()
);
}
@@ -174,7 +176,7 @@ public class ForwardIndexTypeTest {
new ForwardIndexConfig.Builder()
.withCompressionType(compression == null ? null :
ChunkCompressionType.valueOf(compression))
.withDeriveNumDocsPerChunk(false)
- .withRawIndexWriterVersion(2)
+
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
.build()
);
}
@@ -195,7 +197,7 @@ public class ForwardIndexTypeTest {
assertEquals(new ForwardIndexConfig.Builder()
.withCompressionType(null)
.withDeriveNumDocsPerChunk(true)
- .withRawIndexWriterVersion(2)
+
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
.build());
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
index 419836c38a..17f169081b 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
@@ -27,8 +27,7 @@ import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
@@ -44,7 +43,7 @@ import static org.testng.Assert.fail;
/**
- * Unit test for {@link VarByteChunkSVForwardIndexReader} and {@link
VarByteChunkSVForwardIndexWriter} classes.
+ * Unit test for {@link VarByteChunkSVForwardIndexReader} and {@link
VarByteChunkForwardIndexWriter} classes.
*/
public class VarByteChunkSVForwardIndexTest {
private static final int NUM_ENTRIES = 5003;
@@ -77,7 +76,7 @@ public class VarByteChunkSVForwardIndexTest {
}
/**
- * This test writes {@link #NUM_ENTRIES} using {@link
VarByteChunkSVForwardIndexWriter}. It then reads
+ * This test writes {@link #NUM_ENTRIES} using {@link
VarByteChunkForwardIndexWriter}. It then reads
* the strings & bytes using {@link VarByteChunkSVForwardIndexReader}, and
asserts that what was written is the
* same as
* what was read in.
@@ -105,12 +104,10 @@ public class VarByteChunkSVForwardIndexTest {
}
// test both formats (4-byte chunk offsets and 8-byte chunk offsets)
- try (VarByteChunkSVForwardIndexWriter fourByteOffsetWriter = new
VarByteChunkSVForwardIndexWriter(outFileFourByte,
- compressionType, NUM_ENTRIES, NUM_DOCS_PER_CHUNK,
maxStringLengthInBytes,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
- VarByteChunkSVForwardIndexWriter eightByteOffsetWriter = new
VarByteChunkSVForwardIndexWriter(outFileEightByte,
- compressionType, NUM_ENTRIES, NUM_DOCS_PER_CHUNK,
maxStringLengthInBytes,
- BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+ try (VarByteChunkForwardIndexWriter fourByteOffsetWriter = new
VarByteChunkForwardIndexWriter(outFileFourByte,
+ compressionType, NUM_ENTRIES, NUM_DOCS_PER_CHUNK,
maxStringLengthInBytes, 2);
+ VarByteChunkForwardIndexWriter eightByteOffsetWriter = new
VarByteChunkForwardIndexWriter(outFileEightByte,
+ compressionType, NUM_ENTRIES, NUM_DOCS_PER_CHUNK,
maxStringLengthInBytes, 3)) {
// NOTE: No need to test BYTES explicitly because STRING is handled as
UTF-8 encoded bytes
for (int i = 0; i < NUM_ENTRIES; i++) {
fourByteOffsetWriter.putString(expected[i]);
@@ -120,12 +117,10 @@ public class VarByteChunkSVForwardIndexTest {
try (VarByteChunkSVForwardIndexReader fourByteOffsetReader = new
VarByteChunkSVForwardIndexReader(
PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte),
DataType.STRING);
- ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
- .createContext();
+ ChunkReaderContext fourByteOffsetReaderContext =
fourByteOffsetReader.createContext();
VarByteChunkSVForwardIndexReader eightByteOffsetReader = new
VarByteChunkSVForwardIndexReader(
PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
DataType.STRING);
- ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
- .createContext()) {
+ ChunkReaderContext eightByteOffsetReaderContext =
eightByteOffsetReader.createContext()) {
for (int i = 0; i < NUM_ENTRIES; i++) {
Assert.assertEquals(fourByteOffsetReader.getString(i,
fourByteOffsetReaderContext), expected[i]);
Assert.assertEquals(eightByteOffsetReader.getString(i,
eightByteOffsetReaderContext), expected[i]);
@@ -230,8 +225,8 @@ public class VarByteChunkSVForwardIndexTest {
}
int numDocsPerChunk =
SingleValueVarByteRawIndexCreator.getNumDocsPerChunk(maxStringLengthInBytes);
- try (VarByteChunkSVForwardIndexWriter writer = new
VarByteChunkSVForwardIndexWriter(outFile, compressionType,
- numDocs, numDocsPerChunk, maxStringLengthInBytes,
BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+ try (VarByteChunkForwardIndexWriter writer = new
VarByteChunkForwardIndexWriter(outFile, compressionType, numDocs,
+ numDocsPerChunk, maxStringLengthInBytes, 3)) {
// NOTE: No need to test BYTES explicitly because STRING is handled as
UTF-8 encoded bytes
for (int i = 0; i < numDocs; i++) {
writer.putString(expected[i]);
@@ -274,7 +269,7 @@ public class VarByteChunkSVForwardIndexTest {
file.deleteOnExit();
int docSize = 21475;
byte[] value = StringUtils.repeat("a", docSize).getBytes(UTF_8);
- try (VarByteChunkSVForwardIndexWriter writer = new
VarByteChunkSVForwardIndexWriter(file,
+ try (VarByteChunkForwardIndexWriter writer = new
VarByteChunkForwardIndexWriter(file,
ChunkCompressionType.PASS_THROUGH, 100_001, 1000, docSize, 2)) {
try {
for (int i = 0; i < 100_000; i++) {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index b7723f1294..d29c62cd34 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -31,13 +31,13 @@ import
org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -319,7 +319,7 @@ public class DictionaryToRawIndexConverter {
try (ForwardIndexCreator rawIndexCreator = ForwardIndexCreatorFactory
.getRawIndexCreatorForSVColumn(newSegment, compressionType, column,
storedType, numDocs, lengthOfLongestEntry,
- false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ false, ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext()) {
switch (storedType) {
case INT:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]