This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch hotfix_chunkwriter_realtime in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4ef4e24456fa4b5f896344b577338ef0e5f0d952 Author: Sidd <[email protected]> AuthorDate: Thu Apr 23 09:49:35 2020 -0700 Use 8byte offsets in chunk based raw index creator (#5285) * Use 8byte offsets in chunk based raw index creator * cleanup * fixed tests * Fix tests and address review comments * Use 8-byte offset for fixed-byte chunk writer. Add backward compatibility test Co-authored-by: Siddharth Teotia <[email protected]> --- .../reader/impl/v1/BaseChunkSingleValueReader.java | 27 +++++--- .../impl/v1/VarByteChunkSingleValueReader.java | 6 +- .../writer/impl/v1/BaseChunkSingleValueWriter.java | 30 +++++++-- .../impl/v1/FixedByteChunkSingleValueWriter.java | 5 +- .../impl/v1/VarByteChunkSingleValueWriter.java | 8 ++- .../segment/memory/PinotNativeOrderLBuffer.java | 2 +- .../segment/memory/PinotNonNativeOrderLBuffer.java | 2 +- .../FixedByteChunkSingleValueReaderWriteTest.java | 25 +++++--- .../VarByteChunkSingleValueReaderWriteTest.java | 68 ++++++++++++++++----- .../src/test/resources/data/fixedByteCompressed.v2 | Bin 0 -> 8098 bytes pinot-core/src/test/resources/data/fixedByteRaw.v2 | Bin 0 -> 16036 bytes .../resources/data/varByteStringsCompressed.v2 | Bin 0 -> 17355 bytes .../src/test/resources/data/varByteStringsRaw.v2 | Bin 0 -> 286902 bytes 13 files changed, 127 insertions(+), 46 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/BaseChunkSingleValueReader.java b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/BaseChunkSingleValueReader.java index 6cd86c1..24972ad 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/BaseChunkSingleValueReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/BaseChunkSingleValueReader.java @@ -18,12 +18,15 @@ */ package org.apache.pinot.core.io.reader.impl.v1; +import com.google.common.base.Preconditions; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.pinot.core.io.compression.ChunkCompressorFactory; import org.apache.pinot.core.io.compression.ChunkDecompressor; import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader; import org.apache.pinot.core.io.reader.impl.ChunkReaderContext; +import org.apache.pinot.core.io.writer.impl.v1.BaseChunkSingleValueWriter; +import org.apache.pinot.core.io.writer.impl.v1.VarByteChunkSingleValueWriter; import org.apache.pinot.core.segment.memory.PinotDataBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +50,8 @@ public abstract class BaseChunkSingleValueReader extends BaseSingleColumnSingleV protected final int _numDocsPerChunk; protected final int _numChunks; protected final int _lengthOfLongestEntry; + private final int _version; + private final int _headerEntryChunkOffsetSize; /** * Constructor for the class. @@ -57,7 +62,7 @@ public abstract class BaseChunkSingleValueReader extends BaseSingleColumnSingleV _dataBuffer = pinotDataBuffer; int headerOffset = 0; - int version = _dataBuffer.getInt(headerOffset); + _version = _dataBuffer.getInt(headerOffset); headerOffset += Integer.BYTES; _numChunks = _dataBuffer.getInt(headerOffset); @@ -70,7 +75,7 @@ public abstract class BaseChunkSingleValueReader extends BaseSingleColumnSingleV headerOffset += Integer.BYTES; int dataHeaderStart = headerOffset; - if (version > 1) { + if (_version > 1) { _dataBuffer.getInt(headerOffset); // Total docs headerOffset += Integer.BYTES; @@ -87,9 +92,10 @@ public abstract class BaseChunkSingleValueReader extends BaseSingleColumnSingleV } _chunkSize = (_lengthOfLongestEntry * _numDocsPerChunk); + _headerEntryChunkOffsetSize = BaseChunkSingleValueWriter.getHeaderEntryChunkOffsetSize(_version); // Slice out the header from the data buffer. - int dataHeaderLength = _numChunks * Integer.BYTES; + int dataHeaderLength = _numChunks * _headerEntryChunkOffsetSize; int rawDataStart = dataHeaderStart + dataHeaderLength; _dataHeader = _dataBuffer.view(dataHeaderStart, rawDataStart); @@ -120,14 +126,14 @@ public abstract class BaseChunkSingleValueReader extends BaseSingleColumnSingleV } int chunkSize; - int chunkPosition = getChunkPosition(chunkId); + long chunkPosition = getChunkPosition(chunkId); // Size of chunk can be determined using next chunks offset, or end of data buffer for last chunk. if (chunkId == (_numChunks - 1)) { // Last chunk. chunkSize = (int) (_dataBuffer.size() - chunkPosition); } else { - int nextChunkOffset = getChunkPosition(chunkId + 1); - chunkSize = nextChunkOffset - chunkPosition; + long nextChunkOffset = getChunkPosition(chunkId + 1); + chunkSize = (int)(nextChunkOffset - chunkPosition); } ByteBuffer decompressedBuffer = context.getChunkBuffer(); @@ -145,12 +151,15 @@ public abstract class BaseChunkSingleValueReader extends BaseSingleColumnSingleV /** * Helper method to get the offset of the chunk in the data. - * * @param chunkId Id of the chunk for which to return the position. * @return Position (offset) of the chunk in the data. */ - protected int getChunkPosition(int chunkId) { - return _dataHeader.getInt(chunkId * Integer.BYTES); + protected long getChunkPosition(int chunkId) { + if (_headerEntryChunkOffsetSize == Integer.BYTES) { + return _dataHeader.getInt(chunkId * _headerEntryChunkOffsetSize); + } else { + return _dataHeader.getLong(chunkId * _headerEntryChunkOffsetSize); + } } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java index 82fcb2b..418a0f3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java @@ -55,7 +55,7 @@ public class VarByteChunkSingleValueReader extends BaseChunkSingleValueReader { int chunkRowId = row % _numDocsPerChunk; ByteBuffer chunkBuffer = getChunkForRow(row, context); - int rowOffset = chunkBuffer.getInt(chunkRowId * Integer.BYTES); + int rowOffset = chunkBuffer.getInt(chunkRowId * VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE); int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer); int length = nextRowOffset - rowOffset; @@ -77,7 +77,7 @@ public class VarByteChunkSingleValueReader extends BaseChunkSingleValueReader { int chunkRowId = row % _numDocsPerChunk; ByteBuffer chunkBuffer = getChunkForRow(row, context); - int rowOffset = chunkBuffer.getInt(chunkRowId * Integer.BYTES); + int rowOffset = chunkBuffer.getInt(chunkRowId * VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE); int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer); int length = nextRowOffset - rowOffset; @@ -109,7 +109,7 @@ public class VarByteChunkSingleValueReader extends BaseChunkSingleValueReader { // Last row in this trunk. nextRowOffset = chunkBuffer.limit(); } else { - nextRowOffset = chunkBuffer.getInt((currentRowId + 1) * Integer.BYTES); + nextRowOffset = chunkBuffer.getInt((currentRowId + 1) * VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE); // For incomplete chunks, the next string's offset will be 0 as row offset for absent rows are 0. if (nextRowOffset == 0) { nextRowOffset = chunkBuffer.limit(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/BaseChunkSingleValueWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/BaseChunkSingleValueWriter.java index 46399b7..597bd1f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/BaseChunkSingleValueWriter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/BaseChunkSingleValueWriter.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.io.writer.impl.v1; +import com.google.common.base.Preconditions; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory; */ public abstract class BaseChunkSingleValueWriter implements SingleColumnSingleValueWriter { private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkSingleValueWriter.class); + private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2 = Integer.BYTES; + private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3 = Long.BYTES; protected final FileChannel _dataFile; protected ByteBuffer _header; @@ -45,7 +48,9 @@ public abstract class BaseChunkSingleValueWriter implements SingleColumnSingleVa protected final ChunkCompressor _chunkCompressor; protected int _chunkSize; - protected int _dataOffset; + protected long _dataOffset; + + private final int _headerEntryChunkOffsetSize; /** * Constructor for the class. @@ -64,13 +69,25 @@ public abstract class BaseChunkSingleValueWriter implements SingleColumnSingleVa throws FileNotFoundException { _chunkSize = chunkSize; _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType); - + _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version); _dataOffset = writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry, version); _chunkBuffer = ByteBuffer.allocateDirect(chunkSize); _compressedBuffer = ByteBuffer.allocateDirect(chunkSize * 2); _dataFile = new RandomAccessFile(file, "rw").getChannel(); } + public static int getHeaderEntryChunkOffsetSize(int version) { + switch (version) { + case 1: + case 2: + return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2; + case 3: + return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3; + default: + throw new IllegalStateException("Invalid version: " + version); + } + } + @Override public void setChar(int row, char ch) { throw new UnsupportedOperationException(); @@ -139,7 +156,7 @@ public abstract class BaseChunkSingleValueWriter implements SingleColumnSingleVa private int writeHeader(ChunkCompressorFactory.CompressionType compressionType, int totalDocs, int numDocsPerChunk, int sizeOfEntry, int version) { int numChunks = (totalDocs + numDocsPerChunk - 1) / numDocsPerChunk; - int headerSize = (numChunks + 7) * Integer.BYTES; // 7 items written before chunk indexing. + int headerSize = (7 * Integer.BYTES) + (numChunks * _headerEntryChunkOffsetSize); _header = ByteBuffer.allocateDirect(headerSize); @@ -196,7 +213,12 @@ public abstract class BaseChunkSingleValueWriter implements SingleColumnSingleVa throw new RuntimeException(e); } - _header.putInt(_dataOffset); + if (_headerEntryChunkOffsetSize == Integer.BYTES) { + _header.putInt((int)_dataOffset); + } else if (_headerEntryChunkOffsetSize == Long.BYTES) { + _header.putLong(_dataOffset); + } + _dataOffset += sizeToWrite; _chunkBuffer.clear(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/FixedByteChunkSingleValueWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/FixedByteChunkSingleValueWriter.java index 4894591..2457e88 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/FixedByteChunkSingleValueWriter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/FixedByteChunkSingleValueWriter.java @@ -40,7 +40,8 @@ import org.apache.pinot.core.io.compression.ChunkCompressorFactory; * <li> Integer: Total number of docs (version 2 onwards). </li> * <li> Integer: Compression type enum value (version 2 onwards). </li> * <li> Integer: Start offset of data header (version 2 onwards). </li> - * <li> Integer array: Integer offsets for all chunks in the data .</li> + * <li> Integer array: Integer offsets for all chunks in the data (upto version 2), + * Long array: Long offsets for all chunks in the data (version 3 onwards) </li> * </ul> * * <p> Individual Chunks: </p> @@ -53,7 +54,7 @@ import org.apache.pinot.core.io.compression.ChunkCompressorFactory; @NotThreadSafe public class FixedByteChunkSingleValueWriter extends BaseChunkSingleValueWriter { - private static final int CURRENT_VERSION = 2; + private static final int CURRENT_VERSION = 3; private int _chunkDataOffset; /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java index 950c003..8a2561b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java @@ -36,7 +36,11 @@ import org.apache.pinot.core.io.compression.ChunkCompressorFactory; * <li> Integer: Total number of chunks. </li> * <li> Integer: Number of docs per chunk. </li> * <li> Integer: Length of longest entry (in bytes). </li> - * <li> Integer array: Integer offsets for all chunks in the data .</li> + * <li> Integer: Total number of docs (version 2 onwards). </li> + * <li> Integer: Compression type enum value (version 2 onwards). </li> + * <li> Integer: Start offset of data header (version 2 onwards). </li> + * <li> Integer array: Integer offsets for all chunks in the data (upto version 2), + * Long array: Long offsets for all chunks in the data (version 3 onwards) </li> * </ul> * * <p> Individual Chunks: </p> @@ -49,7 +53,7 @@ import org.apache.pinot.core.io.compression.ChunkCompressorFactory; */ @NotThreadSafe public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter { - private static final int CURRENT_VERSION = 2; + private static final int CURRENT_VERSION = 3; public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES; private final int _chunkHeaderSize; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotNativeOrderLBuffer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotNativeOrderLBuffer.java index cf7af29..8f8c97c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotNativeOrderLBuffer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotNativeOrderLBuffer.java @@ -43,7 +43,7 @@ public class PinotNativeOrderLBuffer extends BasePinotLBuffer { return buffer; } - static PinotNativeOrderLBuffer mapFile(File file, boolean readOnly, long offset, long size) + public static PinotNativeOrderLBuffer mapFile(File file, boolean readOnly, long offset, long size) throws IOException { if (readOnly) { return new PinotNativeOrderLBuffer(new MMapBuffer(file, offset, size, MMapMode.READ_ONLY), true, false); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotNonNativeOrderLBuffer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotNonNativeOrderLBuffer.java index e14c784..b597686 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotNonNativeOrderLBuffer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotNonNativeOrderLBuffer.java @@ -43,7 +43,7 @@ public class PinotNonNativeOrderLBuffer extends BasePinotLBuffer { return buffer; } - static PinotNonNativeOrderLBuffer mapFile(File file, boolean readOnly, long offset, long size) + public static PinotNonNativeOrderLBuffer mapFile(File file, boolean readOnly, long offset, long size) throws IOException { if (readOnly) { return new PinotNonNativeOrderLBuffer(new MMapBuffer(file, offset, size, MMapMode.READ_ONLY), true, false); diff --git a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/FixedByteChunkSingleValueReaderWriteTest.java b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/FixedByteChunkSingleValueReaderWriteTest.java index 92d1f3a..b670486 100644 --- a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/FixedByteChunkSingleValueReaderWriteTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/FixedByteChunkSingleValueReaderWriteTest.java @@ -260,25 +260,32 @@ public class FixedByteChunkSingleValueReaderWriteTest { * @throws IOException */ @Test - public void testBackwardCompatibility() - throws IOException { - // Get v1 from resources folder + public void testBackwardCompatibilityV1() + throws Exception { + testBackwardCompatibilityHelper("data/fixedByteSVRDoubles.v1", 10009, 0); + } + + @Test + public void testBackwardCompatibilityV2() + throws Exception { + testBackwardCompatibilityHelper("data/fixedByteCompressed.v2", 2000, 100.2356); + testBackwardCompatibilityHelper("data/fixedByteRaw.v2", 2000, 100.2356); + } + + private void testBackwardCompatibilityHelper(String fileName, int numDocs, double startValue) + throws Exception { ClassLoader classLoader = getClass().getClassLoader(); - String fileName = "data/fixedByteSVRDoubles.v1"; URL resource = classLoader.getResource(fileName); if (resource == null) { throw new RuntimeException("Input file not found: " + fileName); } - File file = new File(resource.getFile()); try (FixedByteChunkSingleValueReader reader = new FixedByteChunkSingleValueReader( PinotDataBuffer.mapReadOnlyBigEndianFile(file))) { ChunkReaderContext context = reader.createContext(); - - int numEntries = 10009; // Number of entries in the input file. - for (int i = 0; i < numEntries; i++) { + for (int i = 0; i < numDocs; i++) { double actual = reader.getDouble(i, context); - Assert.assertEquals(actual, (double) i); + Assert.assertEquals(actual, i + startValue); } } } diff --git a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java index 659bbb2..334a935 100644 --- a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java @@ -21,16 +21,20 @@ package org.apache.pinot.index.readerwriter; import java.io.File; import java.io.IOException; import java.net.URL; +import java.nio.ByteOrder; import java.nio.charset.Charset; import java.util.Random; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.RandomStringUtils; +import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.core.io.compression.ChunkCompressorFactory; import org.apache.pinot.core.io.reader.impl.ChunkReaderContext; import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader; import org.apache.pinot.core.io.writer.impl.v1.VarByteChunkSingleValueWriter; import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator; import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.core.segment.memory.PinotNativeOrderLBuffer; +import org.apache.pinot.core.segment.memory.PinotNonNativeOrderLBuffer; import org.testng.Assert; import org.testng.annotations.Test; @@ -113,27 +117,37 @@ public class VarByteChunkSingleValueReaderWriteTest { * @throws IOException */ @Test - public void testBackwardCompatibility() - throws IOException { + public void testBackwardCompatibilityV1() + throws Exception { String[] expected = new String[]{"abcde", "fgh", "ijklmn", "12345"}; + testBackwardCompatibilityHelper("data/varByteStrings.v1", expected, 1009); + } - // Get v1 from resources folder + /** + * This test ensures that the reader can read in an data file from version 2. + */ + @Test + public void testBackwardCompatibilityV2() + throws Exception { + String[] data = {"abcdefghijk", "12456887", "pqrstuv", "500"}; + testBackwardCompatibilityHelper("data/varByteStringsCompressed.v2", data, 1000); + testBackwardCompatibilityHelper("data/varByteStringsRaw.v2", data, 1000); + } + + private void testBackwardCompatibilityHelper(String fileName, String[] data, int numDocs) + throws Exception { ClassLoader classLoader = getClass().getClassLoader(); - String fileName = "data/varByteStrings.v1"; URL resource = classLoader.getResource(fileName); if (resource == null) { throw new RuntimeException("Input file not found: " + fileName); } - File file = new File(resource.getFile()); try (VarByteChunkSingleValueReader reader = new VarByteChunkSingleValueReader( PinotDataBuffer.mapReadOnlyBigEndianFile(file))) { ChunkReaderContext context = reader.createContext(); - - int numEntries = 1009; // Number of entries in the input file. - for (int i = 0; i < numEntries; i++) { + for (int i = 0; i < numDocs; i++) { String actual = reader.getString(i, context); - Assert.assertEquals(actual, expected[i % expected.length]); + Assert.assertEquals(actual, data[i % data.length]); } } } @@ -173,7 +187,7 @@ public class VarByteChunkSingleValueReaderWriteTest { int maxStringLengthInBytes = 0; for (int i = 0; i < numDocs; i++) { expected[i] = RandomStringUtils.random(random.nextInt(numChars)); - maxStringLengthInBytes = Math.max(maxStringLengthInBytes, expected[i].getBytes(UTF_8).length); + maxStringLengthInBytes = Math.max(maxStringLengthInBytes, StringUtil.encodeUtf8(expected[i]).length); } int numDocsPerChunk = SingleValueVarByteRawIndexCreator.getNumDocsPerChunk(maxStringLengthInBytes); @@ -183,20 +197,44 @@ public class VarByteChunkSingleValueReaderWriteTest { for (int i = 0; i < numDocs; i += 2) { writer.setString(i, expected[i]); - writer.setBytes(i + 1, expected[i].getBytes(UTF_8)); + writer.setBytes(i + 1, StringUtil.encodeUtf8(expected[i])); } writer.close(); - try (VarByteChunkSingleValueReader reader = new VarByteChunkSingleValueReader( - PinotDataBuffer.mapReadOnlyBigEndianFile(outFile))) { + PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(outFile); + try (VarByteChunkSingleValueReader reader = new VarByteChunkSingleValueReader(buffer)) { ChunkReaderContext context = reader.createContext(); + for (int i = 0; i < numDocs; i += 2) { + String actual = reader.getString(i, context); + Assert.assertEquals(actual, expected[i]); + byte[] expectedBytes = StringUtil.encodeUtf8(expected[i]); + Assert.assertEquals(StringUtil.encodeUtf8(actual), expectedBytes); + Assert.assertEquals(reader.getBytes(i + 1, context), expectedBytes); + } + } + + // For large variable width column values (where total size of data + // across all rows in the segment is > 2GB), LBuffer will be used for + // reading the fwd index. However, to test this scenario the unit test + // will take a long time to execute due to comparison + // (75000 characters in each row and 10000 rows will hit this scenario). + // So we specifically test for mapping the index file into a LBuffer + // to exercise the LBuffer code + if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) { + buffer = PinotNativeOrderLBuffer.mapFile(outFile, true, 0, outFile.length()); + } else { + buffer = PinotNonNativeOrderLBuffer.mapFile(outFile, true, 0, outFile.length()); + } + try (VarByteChunkSingleValueReader reader = new VarByteChunkSingleValueReader(buffer)) { + ChunkReaderContext context = reader.createContext(); for (int i = 0; i < numDocs; i += 2) { String actual = reader.getString(i, context); Assert.assertEquals(actual, expected[i]); - Assert.assertEquals(actual.getBytes(UTF_8), expected[i].getBytes(UTF_8)); - Assert.assertEquals(reader.getBytes(i + 1), expected[i].getBytes(UTF_8)); + byte[] expectedBytes = StringUtil.encodeUtf8(expected[i]); + Assert.assertEquals(StringUtil.encodeUtf8(actual), expectedBytes); + Assert.assertEquals(reader.getBytes(i + 1, context), expectedBytes); } } diff --git a/pinot-core/src/test/resources/data/fixedByteCompressed.v2 b/pinot-core/src/test/resources/data/fixedByteCompressed.v2 new file mode 100644 index 0000000..e2ee3a8 Binary files /dev/null and b/pinot-core/src/test/resources/data/fixedByteCompressed.v2 differ diff --git a/pinot-core/src/test/resources/data/fixedByteRaw.v2 b/pinot-core/src/test/resources/data/fixedByteRaw.v2 new file mode 100644 index 0000000..a9d6d34 Binary files /dev/null and b/pinot-core/src/test/resources/data/fixedByteRaw.v2 differ diff --git a/pinot-core/src/test/resources/data/varByteStringsCompressed.v2 b/pinot-core/src/test/resources/data/varByteStringsCompressed.v2 new file mode 100644 index 0000000..ef3da91 Binary files /dev/null and b/pinot-core/src/test/resources/data/varByteStringsCompressed.v2 differ diff --git a/pinot-core/src/test/resources/data/varByteStringsRaw.v2 b/pinot-core/src/test/resources/data/varByteStringsRaw.v2 new file mode 100644 index 0000000..0f838dc Binary files /dev/null and b/pinot-core/src/test/resources/data/varByteStringsRaw.v2 differ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
