This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 142a86f Derive num docs per chunk from max column value length for
varbyte raw index creator (#5256)
142a86f is described below
commit 142a86fd9976783cdc5416fa13eccd3b6dbfa5fa
Author: Sidd <[email protected]>
AuthorDate: Thu Apr 16 22:29:17 2020 -0700
Derive num docs per chunk from max column value length for varbyte raw
index creator (#5256)
* Derive numDocsPerChunk from max column value length
for var byte raw forward index creator
* review comments
Co-authored-by: Siddharth Teotia <[email protected]>
---
.../impl/v1/VarByteChunkSingleValueReader.java | 4 +-
.../impl/v1/VarByteChunkSingleValueWriter.java | 7 ++-
.../fwd/SingleValueVarByteRawIndexCreator.java | 11 +++-
.../defaultcolumn/BaseDefaultColumnHandler.java | 3 +-
.../loader/invertedindex/TextIndexHandler.java | 4 +-
.../VarByteChunkSingleValueReaderWriteTest.java | 66 ++++++++++++++++++++++
6 files changed, 85 insertions(+), 10 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
index 1ebbe32..82fcb2b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
@@ -42,9 +42,7 @@ public class VarByteChunkSingleValueReader extends
BaseChunkSingleValueReader {
*/
public VarByteChunkSingleValueReader(PinotDataBuffer pinotDataBuffer) {
super(pinotDataBuffer);
-
- int chunkHeaderSize = _numDocsPerChunk * Integer.BYTES;
- _maxChunkSize = chunkHeaderSize + (_lengthOfLongestEntry *
_numDocsPerChunk);
+ _maxChunkSize = _numDocsPerChunk *
(VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE +
_lengthOfLongestEntry);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
index 2c6a299..950c003 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
@@ -50,6 +50,7 @@ import
org.apache.pinot.core.io.compression.ChunkCompressorFactory;
@NotThreadSafe
public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {
private static final int CURRENT_VERSION = 2;
+ public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
private final int _chunkHeaderSize;
private int _chunkHeaderOffset;
@@ -70,11 +71,11 @@ public class VarByteChunkSingleValueWriter extends
BaseChunkSingleValueWriter {
throws FileNotFoundException {
super(file, compressionType, totalDocs, numDocsPerChunk,
- ((numDocsPerChunk * Integer.BYTES) + (lengthOfLongestEntry *
numDocsPerChunk)), // chunkSize
+ numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE +
lengthOfLongestEntry), // chunkSize
lengthOfLongestEntry, CURRENT_VERSION);
_chunkHeaderOffset = 0;
- _chunkHeaderSize = numDocsPerChunk * Integer.BYTES;
+ _chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
_chunkDataOffSet = _chunkHeaderSize;
}
@@ -87,7 +88,7 @@ public class VarByteChunkSingleValueWriter extends
BaseChunkSingleValueWriter {
@Override
public void setBytes(int row, byte[] bytes) {
_chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
- _chunkHeaderOffset += Integer.BYTES;
+ _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
_chunkBuffer.position(_chunkDataOffSet);
_chunkBuffer.put(bytes);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
index 34879bd..a8d7e6b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.creator.impl.fwd;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
@@ -27,7 +28,7 @@ import org.apache.pinot.core.segment.creator.impl.V1Constants;
public class SingleValueVarByteRawIndexCreator extends
BaseSingleValueRawIndexCreator {
- private static final int NUM_DOCS_PER_CHUNK = 1000; // TODO: Auto-derive
this based on metadata.
+ private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
private final VarByteChunkSingleValueWriter _indexWriter;
@@ -35,7 +36,13 @@ public class SingleValueVarByteRawIndexCreator extends
BaseSingleValueRawIndexCr
String column, int totalDocs, int maxLength)
throws IOException {
File file = new File(baseIndexDir, column +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
- _indexWriter = new VarByteChunkSingleValueWriter(file, compressionType,
totalDocs, NUM_DOCS_PER_CHUNK, maxLength);
+ _indexWriter = new VarByteChunkSingleValueWriter(file, compressionType,
totalDocs, getNumDocsPerChunk(maxLength), maxLength);
+ }
+
+ @VisibleForTesting
+ public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+ int overheadPerEntry = lengthOfLongestEntry +
VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 447489d..df8b6cf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -21,6 +21,7 @@ package
org.apache.pinot.core.segment.index.loader.defaultcolumn;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -338,7 +339,7 @@ public abstract class BaseDefaultColumnHandler implements
DefaultColumnHandler {
int totalDocs = _segmentMetadata.getTotalDocs();
Object defaultValue = fieldSpec.getDefaultNullValue();
String stringDefaultValue = (String) defaultValue;
- int lengthOfLongestEntry = stringDefaultValue.length();
+ int lengthOfLongestEntry =
StringUtil.encodeUtf8(stringDefaultValue).length;
int dictionaryElementSize = 0;
SingleValueVarByteRawIndexCreator rawIndexCreator =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
index 1c1786f..a11b25c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -44,6 +44,7 @@ import javax.annotation.Nonnull;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import org.apache.pinot.core.segment.creator.TextIndexType;
import
org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator;
@@ -162,8 +163,9 @@ public class TextIndexHandler {
try (LuceneTextIndexCreator textIndexCreator = new
LuceneTextIndexCreator(column, segmentDirectory, true)) {
try (DataFileReader forwardIndexReader =
getForwardIndexReader(columnMetadata)) {
VarByteChunkSingleValueReader forwardIndex =
(VarByteChunkSingleValueReader) forwardIndexReader;
+ ChunkReaderContext readerContext = forwardIndex.createContext();
for (int docID = 0; docID < numDocs; docID++) {
- Object docToAdd = forwardIndex.getString(docID);
+ Object docToAdd = forwardIndex.getString(docID, readerContext);
textIndexCreator.addDoc(docToAdd, docID);
}
textIndexCreator.seal();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
index d0d62e3..659bbb2 100644
---
a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
@@ -29,6 +29,7 @@ import
org.apache.pinot.core.io.compression.ChunkCompressorFactory;
import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import org.apache.pinot.core.io.writer.impl.v1.VarByteChunkSingleValueWriter;
+import
org.apache.pinot.core.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -136,4 +137,69 @@ public class VarByteChunkSingleValueReaderWriteTest {
}
}
}
+
+ @Test
+ public void testVarCharWithDifferentSizes() throws Exception {
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10,
1000);
+
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10,
1000);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100,
1000);
+
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH,
100, 1000);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY,
1000, 1000);
+
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH,
1000, 1000);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY,
10000, 100);
+
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH,
10000, 100);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY,
100000, 10);
+
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH,
100000, 10);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY,
1000000, 10);
+
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH,
1000000, 10);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY,
2000000, 10);
+
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH,
2000000, 10);
+ }
+
+ private void testLargeVarcharHelper(ChunkCompressorFactory.CompressionType
compressionType, int numChars, int numDocs)
+ throws Exception {
+ String[] expected = new String[numDocs];
+ Random random = new Random();
+
+ File outFile = new File(TEST_FILE);
+ FileUtils.deleteQuietly(outFile);
+
+ int maxStringLengthInBytes = 0;
+ for (int i = 0; i < numDocs; i++) {
+ expected[i] = RandomStringUtils.random(random.nextInt(numChars));
+ maxStringLengthInBytes = Math.max(maxStringLengthInBytes,
expected[i].getBytes(UTF_8).length);
+ }
+
+ int numDocsPerChunk =
SingleValueVarByteRawIndexCreator.getNumDocsPerChunk(maxStringLengthInBytes);
+ VarByteChunkSingleValueWriter writer =
+ new VarByteChunkSingleValueWriter(outFile, compressionType, numDocs,
numDocsPerChunk,
+ maxStringLengthInBytes);
+
+ for (int i = 0; i < numDocs; i += 2) {
+ writer.setString(i, expected[i]);
+ writer.setBytes(i + 1, expected[i].getBytes(UTF_8));
+ }
+
+ writer.close();
+
+ try (VarByteChunkSingleValueReader reader = new
VarByteChunkSingleValueReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFile))) {
+ ChunkReaderContext context = reader.createContext();
+
+ for (int i = 0; i < numDocs; i += 2) {
+ String actual = reader.getString(i, context);
+ Assert.assertEquals(actual, expected[i]);
+ Assert.assertEquals(actual.getBytes(UTF_8),
expected[i].getBytes(UTF_8));
+ Assert.assertEquals(reader.getBytes(i + 1),
expected[i].getBytes(UTF_8));
+ }
+ }
+
+ FileUtils.deleteQuietly(outFile);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]