This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch hotfix_chunkwriter_realtime
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 34080af27f2b94f24ea54aa488c767eff846879f
Author: Sidd <siddharthteo...@gmail.com>
AuthorDate: Thu Apr 16 22:29:17 2020 -0700

    Derive num docs per chunk from max column value length for varbyte raw 
index creator (#5256)
    
    * Derive numDocsPerChunk from max column value length
    for var byte raw forward index creator
    
    * review comments
    
    Co-authored-by: Siddharth Teotia <steo...@steotia-mn1.linkedin.biz>
---
 .../impl/v1/VarByteChunkSingleValueReader.java     |  4 +-
 .../impl/v1/VarByteChunkSingleValueWriter.java     |  7 ++-
 .../fwd/SingleValueVarByteRawIndexCreator.java     | 11 +++-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  3 +-
 .../loader/invertedindex/TextIndexHandler.java     |  4 +-
 .../VarByteChunkSingleValueReaderWriteTest.java    | 66 ++++++++++++++++++++++
 6 files changed, 85 insertions(+), 10 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
index 1ebbe32..82fcb2b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
@@ -42,9 +42,7 @@ public class VarByteChunkSingleValueReader extends 
BaseChunkSingleValueReader {
    */
   public VarByteChunkSingleValueReader(PinotDataBuffer pinotDataBuffer) {
     super(pinotDataBuffer);
-
-    int chunkHeaderSize = _numDocsPerChunk * Integer.BYTES;
-    _maxChunkSize = chunkHeaderSize + (_lengthOfLongestEntry * 
_numDocsPerChunk);
+    _maxChunkSize = _numDocsPerChunk * 
(VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + 
_lengthOfLongestEntry);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
index 2c6a299..950c003 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
@@ -50,6 +50,7 @@ import 
org.apache.pinot.core.io.compression.ChunkCompressorFactory;
 @NotThreadSafe
 public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {
   private static final int CURRENT_VERSION = 2;
+  public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
 
   private final int _chunkHeaderSize;
   private int _chunkHeaderOffset;
@@ -70,11 +71,11 @@ public class VarByteChunkSingleValueWriter extends 
BaseChunkSingleValueWriter {
       throws FileNotFoundException {
 
     super(file, compressionType, totalDocs, numDocsPerChunk,
-        ((numDocsPerChunk * Integer.BYTES) + (lengthOfLongestEntry * 
numDocsPerChunk)), // chunkSize
+        numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + 
lengthOfLongestEntry), // chunkSize
         lengthOfLongestEntry, CURRENT_VERSION);
 
     _chunkHeaderOffset = 0;
-    _chunkHeaderSize = numDocsPerChunk * Integer.BYTES;
+    _chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
     _chunkDataOffSet = _chunkHeaderSize;
   }
 
@@ -87,7 +88,7 @@ public class VarByteChunkSingleValueWriter extends 
BaseChunkSingleValueWriter {
   @Override
   public void setBytes(int row, byte[] bytes) {
     _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
-    _chunkHeaderOffset += Integer.BYTES;
+    _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
 
     _chunkBuffer.position(_chunkDataOffSet);
     _chunkBuffer.put(bytes);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
index 34879bd..a8d7e6b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.segment.creator.impl.fwd;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
@@ -27,7 +28,7 @@ import org.apache.pinot.core.segment.creator.impl.V1Constants;
 
 
 public class SingleValueVarByteRawIndexCreator extends 
BaseSingleValueRawIndexCreator {
-  private static final int NUM_DOCS_PER_CHUNK = 1000; // TODO: Auto-derive 
this based on metadata.
+  private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
 
   private final VarByteChunkSingleValueWriter _indexWriter;
 
@@ -35,7 +36,13 @@ public class SingleValueVarByteRawIndexCreator extends 
BaseSingleValueRawIndexCr
       String column, int totalDocs, int maxLength)
       throws IOException {
     File file = new File(baseIndexDir, column + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
-    _indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, 
totalDocs, NUM_DOCS_PER_CHUNK, maxLength);
+    _indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, 
totalDocs, getNumDocsPerChunk(maxLength), maxLength);
+  }
+
+  @VisibleForTesting
+  public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+    int overheadPerEntry = lengthOfLongestEntry + 
VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 447489d..df8b6cf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -21,6 +21,7 @@ package 
org.apache.pinot.core.segment.index.loader.defaultcolumn;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -338,7 +339,7 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
     int totalDocs = _segmentMetadata.getTotalDocs();
     Object defaultValue = fieldSpec.getDefaultNullValue();
     String stringDefaultValue = (String) defaultValue;
-    int lengthOfLongestEntry = stringDefaultValue.length();
+    int lengthOfLongestEntry = 
StringUtil.encodeUtf8(stringDefaultValue).length;
     int dictionaryElementSize = 0;
 
     SingleValueVarByteRawIndexCreator rawIndexCreator =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
index 1c1786f..a11b25c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -44,6 +44,7 @@ import javax.annotation.Nonnull;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
 import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
 import org.apache.pinot.core.segment.creator.TextIndexType;
 import 
org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator;
@@ -162,8 +163,9 @@ public class TextIndexHandler {
     try (LuceneTextIndexCreator textIndexCreator = new 
LuceneTextIndexCreator(column, segmentDirectory, true)) {
       try (DataFileReader forwardIndexReader = 
getForwardIndexReader(columnMetadata)) {
         VarByteChunkSingleValueReader forwardIndex = 
(VarByteChunkSingleValueReader) forwardIndexReader;
+        ChunkReaderContext readerContext = forwardIndex.createContext();
         for (int docID = 0; docID < numDocs; docID++) {
-          Object docToAdd = forwardIndex.getString(docID);
+          Object docToAdd = forwardIndex.getString(docID, readerContext);
           textIndexCreator.addDoc(docToAdd, docID);
         }
         textIndexCreator.seal();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
index d0d62e3..659bbb2 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
@@ -29,6 +29,7 @@ import 
org.apache.pinot.core.io.compression.ChunkCompressorFactory;
 import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
 import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
 import org.apache.pinot.core.io.writer.impl.v1.VarByteChunkSingleValueWriter;
+import 
org.apache.pinot.core.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -136,4 +137,69 @@ public class VarByteChunkSingleValueReaderWriteTest {
       }
     }
   }
+
+  @Test
+  public void testVarCharWithDifferentSizes() throws Exception {
+    testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10, 
1000);
+    
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10, 
1000);
+
+    testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100, 
1000);
+    
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 
100, 1000);
+
+    testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 
1000, 1000);
+    
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 
1000, 1000);
+
+    testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 
10000, 100);
+    
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 
10000, 100);
+
+    testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 
100000, 10);
+    
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 
100000, 10);
+
+    testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 
1000000, 10);
+    
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 
1000000, 10);
+
+    testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 
2000000, 10);
+    
testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 
2000000, 10);
+  }
+
+  private void testLargeVarcharHelper(ChunkCompressorFactory.CompressionType 
compressionType, int numChars, int numDocs)
+      throws Exception {
+    String[] expected = new String[numDocs];
+    Random random = new Random();
+
+    File outFile = new File(TEST_FILE);
+    FileUtils.deleteQuietly(outFile);
+
+    int maxStringLengthInBytes = 0;
+    for (int i = 0; i < numDocs; i++) {
+      expected[i] = RandomStringUtils.random(random.nextInt(numChars));
+      maxStringLengthInBytes = Math.max(maxStringLengthInBytes, 
expected[i].getBytes(UTF_8).length);
+    }
+
+    int numDocsPerChunk = 
SingleValueVarByteRawIndexCreator.getNumDocsPerChunk(maxStringLengthInBytes);
+    VarByteChunkSingleValueWriter writer =
+        new VarByteChunkSingleValueWriter(outFile, compressionType, numDocs, 
numDocsPerChunk,
+            maxStringLengthInBytes);
+
+    for (int i = 0; i < numDocs; i += 2) {
+      writer.setString(i, expected[i]);
+      writer.setBytes(i + 1, expected[i].getBytes(UTF_8));
+    }
+
+    writer.close();
+
+    try (VarByteChunkSingleValueReader reader = new 
VarByteChunkSingleValueReader(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(outFile))) {
+      ChunkReaderContext context = reader.createContext();
+
+      for (int i = 0; i < numDocs; i += 2) {
+        String actual = reader.getString(i, context);
+        Assert.assertEquals(actual, expected[i]);
+        Assert.assertEquals(actual.getBytes(UTF_8), 
expected[i].getBytes(UTF_8));
+        Assert.assertEquals(reader.getBytes(i + 1), 
expected[i].getBytes(UTF_8));
+      }
+    }
+
+    FileUtils.deleteQuietly(outFile);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to