This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch hotfix_chunkwriter_realtime in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 34080af27f2b94f24ea54aa488c767eff846879f Author: Sidd <siddharthteo...@gmail.com> AuthorDate: Thu Apr 16 22:29:17 2020 -0700 Derive num docs per chunk from max column value length for varbyte raw index creator (#5256) * Derive numDocsPerChunk from max column value length for var byte raw forward index creator * review comments Co-authored-by: Siddharth Teotia <steo...@steotia-mn1.linkedin.biz> --- .../impl/v1/VarByteChunkSingleValueReader.java | 4 +- .../impl/v1/VarByteChunkSingleValueWriter.java | 7 ++- .../fwd/SingleValueVarByteRawIndexCreator.java | 11 +++- .../defaultcolumn/BaseDefaultColumnHandler.java | 3 +- .../loader/invertedindex/TextIndexHandler.java | 4 +- .../VarByteChunkSingleValueReaderWriteTest.java | 66 ++++++++++++++++++++++ 6 files changed, 85 insertions(+), 10 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java index 1ebbe32..82fcb2b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java @@ -42,9 +42,7 @@ public class VarByteChunkSingleValueReader extends BaseChunkSingleValueReader { */ public VarByteChunkSingleValueReader(PinotDataBuffer pinotDataBuffer) { super(pinotDataBuffer); - - int chunkHeaderSize = _numDocsPerChunk * Integer.BYTES; - _maxChunkSize = chunkHeaderSize + (_lengthOfLongestEntry * _numDocsPerChunk); + _maxChunkSize = _numDocsPerChunk * (VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + _lengthOfLongestEntry); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java index 2c6a299..950c003 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java @@ -50,6 +50,7 @@ import org.apache.pinot.core.io.compression.ChunkCompressorFactory; @NotThreadSafe public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter { private static final int CURRENT_VERSION = 2; + public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES; private final int _chunkHeaderSize; private int _chunkHeaderOffset; @@ -70,11 +71,11 @@ public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter { throws FileNotFoundException { super(file, compressionType, totalDocs, numDocsPerChunk, - ((numDocsPerChunk * Integer.BYTES) + (lengthOfLongestEntry * numDocsPerChunk)), // chunkSize + numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + lengthOfLongestEntry), // chunkSize lengthOfLongestEntry, CURRENT_VERSION); _chunkHeaderOffset = 0; - _chunkHeaderSize = numDocsPerChunk * Integer.BYTES; + _chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; _chunkDataOffSet = _chunkHeaderSize; } @@ -87,7 +88,7 @@ public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter { @Override public void setBytes(int row, byte[] bytes) { _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet); - _chunkHeaderOffset += Integer.BYTES; + _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; _chunkBuffer.position(_chunkDataOffSet); _chunkBuffer.put(bytes); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java index 34879bd..a8d7e6b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.creator.impl.fwd; +import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; import org.apache.pinot.core.io.compression.ChunkCompressorFactory; @@ -27,7 +28,7 @@ import org.apache.pinot.core.segment.creator.impl.V1Constants; public class SingleValueVarByteRawIndexCreator extends BaseSingleValueRawIndexCreator { - private static final int NUM_DOCS_PER_CHUNK = 1000; // TODO: Auto-derive this based on metadata. + private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024; private final VarByteChunkSingleValueWriter _indexWriter; @@ -35,7 +36,13 @@ public class SingleValueVarByteRawIndexCreator extends BaseSingleValueRawIndexCr String column, int totalDocs, int maxLength) throws IOException { File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); - _indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, NUM_DOCS_PER_CHUNK, maxLength); + _indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, getNumDocsPerChunk(maxLength), maxLength); + } + + @VisibleForTesting + public static int getNumDocsPerChunk(int lengthOfLongestEntry) { + int overheadPerEntry = lengthOfLongestEntry + VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java index 447489d..df8b6cf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.index.loader.defaultcolumn; import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -338,7 +339,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler { int totalDocs = _segmentMetadata.getTotalDocs(); Object defaultValue = fieldSpec.getDefaultNullValue(); String stringDefaultValue = (String) defaultValue; - int lengthOfLongestEntry = stringDefaultValue.length(); + int lengthOfLongestEntry = StringUtil.encodeUtf8(stringDefaultValue).length; int dictionaryElementSize = 0; SingleValueVarByteRawIndexCreator rawIndexCreator = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java index 1c1786f..a11b25c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java @@ -44,6 +44,7 @@ import javax.annotation.Nonnull; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; import org.apache.pinot.core.io.reader.DataFileReader; +import org.apache.pinot.core.io.reader.impl.ChunkReaderContext; import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader; import org.apache.pinot.core.segment.creator.TextIndexType; import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator; @@ -162,8 +163,9 @@ public class TextIndexHandler { try (LuceneTextIndexCreator textIndexCreator = new LuceneTextIndexCreator(column, segmentDirectory, true)) { try (DataFileReader forwardIndexReader = getForwardIndexReader(columnMetadata)) { VarByteChunkSingleValueReader forwardIndex = (VarByteChunkSingleValueReader) forwardIndexReader; + ChunkReaderContext readerContext = forwardIndex.createContext(); for (int docID = 0; docID < numDocs; docID++) { - Object docToAdd = forwardIndex.getString(docID); + Object docToAdd = forwardIndex.getString(docID, readerContext); textIndexCreator.addDoc(docToAdd, docID); } textIndexCreator.seal(); diff --git a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java index d0d62e3..659bbb2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java @@ -29,6 +29,7 @@ import org.apache.pinot.core.io.compression.ChunkCompressorFactory; import org.apache.pinot.core.io.reader.impl.ChunkReaderContext; import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader; import org.apache.pinot.core.io.writer.impl.v1.VarByteChunkSingleValueWriter; +import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator; import org.apache.pinot.core.segment.memory.PinotDataBuffer; import org.testng.Assert; import org.testng.annotations.Test; @@ -136,4 +137,69 @@ public class VarByteChunkSingleValueReaderWriteTest { } } } + + @Test + public void testVarCharWithDifferentSizes() throws Exception { + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10, 1000); + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10, 1000); + + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100, 1000); + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 100, 1000); + + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 1000, 1000); + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 1000, 1000); + + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10000, 100); + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10000, 100); + + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100000, 10); + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 100000, 10); + + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 1000000, 10); + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 1000000, 10); + + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 2000000, 10); + testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 2000000, 10); + } + + private void testLargeVarcharHelper(ChunkCompressorFactory.CompressionType compressionType, int numChars, int numDocs) + throws Exception { + String[] expected = new String[numDocs]; + Random random = new Random(); + + File outFile = new File(TEST_FILE); + FileUtils.deleteQuietly(outFile); + + int maxStringLengthInBytes = 0; + for (int i = 0; i < numDocs; i++) { + expected[i] = RandomStringUtils.random(random.nextInt(numChars)); + maxStringLengthInBytes = Math.max(maxStringLengthInBytes, expected[i].getBytes(UTF_8).length); + } + + int numDocsPerChunk = SingleValueVarByteRawIndexCreator.getNumDocsPerChunk(maxStringLengthInBytes); + VarByteChunkSingleValueWriter writer = + new VarByteChunkSingleValueWriter(outFile, compressionType, numDocs, numDocsPerChunk, + maxStringLengthInBytes); + + for (int i = 0; i < numDocs; i += 2) { + writer.setString(i, expected[i]); + writer.setBytes(i + 1, expected[i].getBytes(UTF_8)); + } + + writer.close(); + + try (VarByteChunkSingleValueReader reader = new VarByteChunkSingleValueReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFile))) { + ChunkReaderContext context = reader.createContext(); + + for (int i = 0; i < numDocs; i += 2) { + String actual = reader.getString(i, context); + Assert.assertEquals(actual, expected[i]); + Assert.assertEquals(actual.getBytes(UTF_8), expected[i].getBytes(UTF_8)); + Assert.assertEquals(reader.getBytes(i + 1), expected[i].getBytes(UTF_8)); + } + } + + FileUtils.deleteQuietly(outFile); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org