This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new aed1307 Add MV raw forward index and MV `BYTES` data type (#7595)
aed1307 is described below
commit aed13072dac0d8dae29056fac77f0f457be7adba
Author: Richard Startin <[email protected]>
AuthorDate: Fri Oct 22 15:50:36 2021 +0100
Add MV raw forward index and MV `BYTES` data type (#7595)
* Initial code for MultiValue forward Index
* Wiring in the segment creation driver Impl
* cleanup
* finish off adding BYTES_ARRAY type
* use less memory and fewer passes during encoding
* reduce memory requirement for forwardindexwriter
* track size in bytes of largest row so chunks can be sized to accommodate
it
* remove TODOs
* force derivation of number of docs for raw MV columns
* specify character encoding
* leave changes to integration tests to MV TEXT index implementation
* fix javadoc
* don't use StringUtils
* fix formatting after rebase
* fix javadoc formatting again
* use zstd's compress bound
Co-authored-by: kishoreg <[email protected]>
---
.../org/apache/pinot/common/utils/DataSchema.java | 9 +-
.../apache/pinot/common/utils/PinotDataType.java | 37 +++-
.../apache/pinot/common/utils/DataSchemaTest.java | 19 +-
.../pinot/common/utils/PinotDataTypeTest.java | 6 +-
.../pinot/core/minion/RawIndexConverter.java | 2 +-
.../local/io/compression/LZ4Compressor.java | 5 +
.../io/compression/PassThroughCompressor.java | 5 +
.../local/io/compression/SnappyCompressor.java | 5 +
.../local/io/compression/ZstandardCompressor.java | 5 +
.../writer/impl/BaseChunkSVForwardIndexWriter.java | 60 ++++---
.../impl/FixedByteChunkSVForwardIndexWriter.java | 4 +-
.../impl/VarByteChunkSVForwardIndexWriter.java | 77 ++++++--
.../creator/impl/SegmentColumnarIndexCreator.java | 192 +++++++++++++++++---
.../fwd/MultiValueFixedByteRawIndexCreator.java | 181 +++++++++++++++++++
.../impl/fwd/MultiValueVarByteRawIndexCreator.java | 122 +++++++++++++
.../stats/AbstractColumnStatisticsCollector.java | 5 +
.../stats/BytesColumnPredIndexStatsCollector.java | 44 +++--
.../stats/StringColumnPreIndexStatsCollector.java | 10 ++
.../forward/VarByteChunkMVForwardIndexReader.java | 193 +++++++++++++++++++++
.../local/segment/store/FilePerIndexDirectory.java | 6 +-
.../MultiValueVarByteRawIndexCreatorTest.java | 141 +++++++++++++++
.../segment/index/creator/RawIndexCreatorTest.java | 135 +++++++++++---
.../org/apache/pinot/segment/spi/V1Constants.java | 1 +
.../segment/spi/compression/ChunkCompressor.java | 2 +
.../spi/creator/ColumnIndexCreationInfo.java | 4 +
.../segment/spi/creator/ColumnStatistics.java | 7 +
.../spi/index/creator/ForwardIndexCreator.java | 9 +
.../spi/index/reader/ForwardIndexReader.java | 19 ++
.../converter/DictionaryToRawIndexConverter.java | 2 +-
29 files changed, 1187 insertions(+), 120 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 6b61cfc..37fb392 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -255,12 +255,13 @@ public class DataSchema {
DOUBLE_ARRAY,
BOOLEAN_ARRAY /* Stored as INT_ARRAY */,
TIMESTAMP_ARRAY /* Stored as LONG_ARRAY */,
+ BYTES_ARRAY,
STRING_ARRAY;
private static final EnumSet<ColumnDataType> NUMERIC_TYPES =
EnumSet.of(INT, LONG, FLOAT, DOUBLE);
private static final EnumSet<ColumnDataType> INTEGRAL_TYPES =
EnumSet.of(INT, LONG);
private static final EnumSet<ColumnDataType> ARRAY_TYPES =
EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY,
- DOUBLE_ARRAY, STRING_ARRAY, BOOLEAN_ARRAY, TIMESTAMP_ARRAY);
+ DOUBLE_ARRAY, STRING_ARRAY, BOOLEAN_ARRAY, TIMESTAMP_ARRAY,
BYTES_ARRAY);
private static final EnumSet<ColumnDataType> NUMERIC_ARRAY_TYPES =
EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY,
DOUBLE_ARRAY);
private static final EnumSet<ColumnDataType> INTEGRAL_ARRAY_TYPES =
EnumSet.of(INT_ARRAY, LONG_ARRAY);
@@ -368,6 +369,8 @@ public class DataSchema {
return toBooleanArray(value);
case TIMESTAMP_ARRAY:
return toTimestampArray(value);
+ case BYTES_ARRAY:
+ return (byte[][]) value;
default:
throw new IllegalStateException(String.format("Cannot convert: '%s'
to type: %s", value, this));
}
@@ -424,6 +427,8 @@ public class DataSchema {
return toBooleanArray(value);
case TIMESTAMP_ARRAY:
return formatTimestampArray(value);
+ case BYTES_ARRAY:
+ return (byte[][]) value;
default:
throw new IllegalStateException(String.format("Cannot convert and
format: '%s' to type: %s", value, this));
}
@@ -541,6 +546,8 @@ public class DataSchema {
return BOOLEAN_ARRAY;
case TIMESTAMP:
return TIMESTAMP_ARRAY;
+ case BYTES:
+ return BYTES_ARRAY;
default:
throw new IllegalStateException("Unsupported data type: " +
dataType);
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index f64bf4a..5352820 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -784,6 +784,13 @@ public enum PinotDataType {
}
},
+ BYTES_ARRAY {
+ @Override
+ public byte[][] convert(Object value, PinotDataType sourceType) {
+ return sourceType.toBytesArray(value);
+ }
+ },
+
OBJECT_ARRAY;
/**
@@ -1034,6 +1041,24 @@ public enum PinotDataType {
}
}
+ public byte[][] toBytesArray(Object value) {
+ if (value instanceof byte[][]) {
+ return (byte[][]) value;
+ }
+ if (isSingleValue()) {
+ return new byte[][]{toBytes(value)};
+ } else {
+ Object[] valueArray = toObjectArray(value);
+ int length = valueArray.length;
+ byte[][] bytesArray = new byte[length][];
+ PinotDataType singleValueType = getSingleValueType();
+ for (int i = 0; i < length; i++) {
+ bytesArray[i] = singleValueType.toBytes(valueArray[i]);
+ }
+ return bytesArray;
+ }
+ }
+
private static Object[] toObjectArray(Object array) {
Class<?> componentType = array.getClass().getComponentType();
if (componentType.isPrimitive()) {
@@ -1132,6 +1157,8 @@ public enum PinotDataType {
return DOUBLE;
case STRING_ARRAY:
return STRING;
+ case BYTES_ARRAY:
+ return BYTES;
case OBJECT_ARRAY:
return OBJECT;
case BOOLEAN_ARRAY:
@@ -1205,6 +1232,9 @@ public enum PinotDataType {
if (cls == Short.class) {
return SHORT_ARRAY;
}
+ if (cls == byte[].class) {
+ return BYTES_ARRAY;
+ }
if (cls == Boolean.class) {
return BOOLEAN_ARRAY;
}
@@ -1233,7 +1263,6 @@ public enum PinotDataType {
/**
* Returns the {@link PinotDataType} for the given {@link FieldSpec} for
data ingestion purpose. Returns object array
* type for multi-valued types.
- * TODO: Add MV support for BYTES
*/
public static PinotDataType getPinotDataTypeForIngestion(FieldSpec
fieldSpec) {
DataType dataType = fieldSpec.getDataType();
@@ -1259,11 +1288,7 @@ public enum PinotDataType {
case STRING:
return fieldSpec.isSingleValueField() ? STRING : STRING_ARRAY;
case BYTES:
- if (fieldSpec.isSingleValueField()) {
- return BYTES;
- } else {
- throw new IllegalStateException("There is no multi-value type for
BYTES");
- }
+ return fieldSpec.isSingleValueField() ? BYTES : BYTES_ARRAY;
default:
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType + " in field: " +
fieldSpec.getName());
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
index 421e2ea..04355b8 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
@@ -29,18 +29,18 @@ import static
org.apache.pinot.common.utils.DataSchema.ColumnDataType.*;
public class DataSchemaTest {
private static final String[] COLUMN_NAMES = {
"int", "long", "float", "double", "string", "object", "int_array",
"long_array", "float_array", "double_array",
- "string_array", "boolean_array", "timestamp_array"
+ "string_array", "boolean_array", "timestamp_array", "bytes_array"
};
private static final int NUM_COLUMNS = COLUMN_NAMES.length;
private static final DataSchema.ColumnDataType[] COLUMN_DATA_TYPES =
{INT, LONG, FLOAT, DOUBLE, STRING, OBJECT, INT_ARRAY, LONG_ARRAY,
FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
- BOOLEAN_ARRAY, TIMESTAMP_ARRAY};
+ BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY};
private static final DataSchema.ColumnDataType[]
COMPATIBLE_COLUMN_DATA_TYPES =
{LONG, FLOAT, DOUBLE, INT, STRING, OBJECT, LONG_ARRAY, FLOAT_ARRAY,
DOUBLE_ARRAY, INT_ARRAY, STRING_ARRAY,
- BOOLEAN_ARRAY, TIMESTAMP_ARRAY};
+ BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY};
private static final DataSchema.ColumnDataType[] UPGRADED_COLUMN_DATA_TYPES
= {
LONG, DOUBLE, DOUBLE, DOUBLE, STRING, OBJECT, LONG_ARRAY, DOUBLE_ARRAY,
DOUBLE_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
- BOOLEAN_ARRAY, TIMESTAMP_ARRAY
+ BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY
};
@Test
@@ -92,7 +92,7 @@ public class DataSchemaTest {
Assert.assertEquals(dataSchema.toString(),
"[int(INT),long(LONG),float(FLOAT),double(DOUBLE),string(STRING),object(OBJECT),int_array(INT_ARRAY),"
+
"long_array(LONG_ARRAY),float_array(FLOAT_ARRAY),double_array(DOUBLE_ARRAY),string_array(STRING_ARRAY),"
- +
"boolean_array(BOOLEAN_ARRAY),timestamp_array(TIMESTAMP_ARRAY)]");
+ +
"boolean_array(BOOLEAN_ARRAY),timestamp_array(TIMESTAMP_ARRAY),bytes_array(BYTES_ARRAY)]");
}
@Test
@@ -107,6 +107,7 @@ public class DataSchemaTest {
Assert.assertFalse(columnDataType.isCompatible(STRING));
Assert.assertFalse(columnDataType.isCompatible(DOUBLE_ARRAY));
Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY));
+ Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
}
for (DataSchema.ColumnDataType columnDataType : new
DataSchema.ColumnDataType[]{FLOAT, DOUBLE}) {
@@ -119,6 +120,7 @@ public class DataSchemaTest {
Assert.assertFalse(columnDataType.isCompatible(STRING));
Assert.assertFalse(columnDataType.isCompatible(LONG_ARRAY));
Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY));
+ Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
}
Assert.assertFalse(STRING.isNumber());
@@ -130,6 +132,7 @@ public class DataSchemaTest {
Assert.assertTrue(STRING.isCompatible(STRING));
Assert.assertFalse(STRING.isCompatible(DOUBLE_ARRAY));
Assert.assertFalse(STRING.isCompatible(STRING_ARRAY));
+ Assert.assertFalse(STRING.isCompatible(BYTES_ARRAY));
Assert.assertFalse(OBJECT.isNumber());
Assert.assertFalse(OBJECT.isWholeNumber());
@@ -140,6 +143,7 @@ public class DataSchemaTest {
Assert.assertFalse(OBJECT.isCompatible(STRING));
Assert.assertFalse(OBJECT.isCompatible(DOUBLE_ARRAY));
Assert.assertFalse(OBJECT.isCompatible(STRING_ARRAY));
+ Assert.assertFalse(OBJECT.isCompatible(BYTES_ARRAY));
Assert.assertTrue(OBJECT.isCompatible(OBJECT));
for (DataSchema.ColumnDataType columnDataType : new
DataSchema.ColumnDataType[]{INT_ARRAY, LONG_ARRAY}) {
@@ -152,6 +156,7 @@ public class DataSchemaTest {
Assert.assertFalse(columnDataType.isCompatible(STRING));
Assert.assertTrue(columnDataType.isCompatible(DOUBLE_ARRAY));
Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY));
+ Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
}
for (DataSchema.ColumnDataType columnDataType : new
DataSchema.ColumnDataType[]{FLOAT_ARRAY, DOUBLE_ARRAY}) {
@@ -164,10 +169,11 @@ public class DataSchemaTest {
Assert.assertFalse(columnDataType.isCompatible(STRING));
Assert.assertTrue(columnDataType.isCompatible(LONG_ARRAY));
Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY));
+ Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
}
for (DataSchema.ColumnDataType columnDataType : new
DataSchema.ColumnDataType[]{STRING_ARRAY, BOOLEAN_ARRAY,
- TIMESTAMP_ARRAY}) {
+ TIMESTAMP_ARRAY, BYTES_ARRAY}) {
Assert.assertFalse(columnDataType.isNumber());
Assert.assertFalse(columnDataType.isWholeNumber());
Assert.assertTrue(columnDataType.isArray());
@@ -192,5 +198,6 @@ public class DataSchemaTest {
Assert.assertEquals(fromDataType(FieldSpec.DataType.STRING, false),
STRING_ARRAY);
Assert.assertEquals(fromDataType(FieldSpec.DataType.BOOLEAN, false),
BOOLEAN_ARRAY);
Assert.assertEquals(fromDataType(FieldSpec.DataType.TIMESTAMP, false),
TIMESTAMP_ARRAY);
+ Assert.assertEquals(fromDataType(FieldSpec.DataType.BYTES, false),
BYTES_ARRAY);
}
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
index 649bb9e..1ddb766 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pinot.common.utils.PinotDataType.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -130,7 +131,9 @@ public class PinotDataTypeTest {
{LONG_ARRAY, TIMESTAMP_ARRAY, new long[] {1000000L, 2000000L},
new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L)
}},
{TIMESTAMP_ARRAY, TIMESTAMP_ARRAY, new Timestamp[] { new
Timestamp(1000000L), new Timestamp(2000000L) },
- new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L) }}
+ new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L) }},
+ {BYTES_ARRAY, BYTES_ARRAY, new byte[][] { "foo".getBytes(UTF_8),
"bar".getBytes(UTF_8) },
+ new byte[][] { "foo".getBytes(UTF_8), "bar".getBytes(UTF_8) }}
};
}
@@ -257,6 +260,7 @@ public class PinotDataTypeTest {
testCases.put(String.class, STRING_ARRAY);
testCases.put(Boolean.class, BOOLEAN_ARRAY);
testCases.put(Timestamp.class, TIMESTAMP_ARRAY);
+ testCases.put(byte[].class, BYTES_ARRAY);
for (Map.Entry<Class<?>, PinotDataType> tc : testCases.entrySet()) {
assertEquals(getMultiValueType(tc.getKey()), tc.getValue());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index f1c7fbb..a3b2e24 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -207,7 +207,7 @@ public class RawIndexConverter {
int numDocs = _originalSegmentMetadata.getTotalDocs();
int lengthOfLongestEntry =
_originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength();
try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
- .getRawIndexCreatorForColumn(_convertedIndexDir,
ChunkCompressionType.SNAPPY, columnName, storedType, numDocs,
+ .getRawIndexCreatorForSVColumn(_convertedIndexDir,
ChunkCompressionType.SNAPPY, columnName, storedType, numDocs,
lengthOfLongestEntry, false,
BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
ForwardIndexReaderContext readerContext = reader.createContext()) {
switch (storedType) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
index e0198b1..bc9de7a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
@@ -48,4 +48,9 @@ public class LZ4Compressor implements ChunkCompressor {
outCompressed.flip();
return outCompressed.limit();
}
+
+ @Override
+ public int maxCompressedSize(int uncompressedSize) {
+ return _lz4Factory.fastCompressor().maxCompressedLength(uncompressedSize);
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
index 30c69c8..b7d876b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
@@ -39,4 +39,9 @@ public class PassThroughCompressor implements ChunkCompressor
{
outCompressed.flip();
return outCompressed.limit();
}
+
+ @Override
+ public int maxCompressedSize(int uncompressedSize) {
+ return uncompressedSize;
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
index e183db7..0b87afe 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
@@ -34,4 +34,9 @@ public class SnappyCompressor implements ChunkCompressor {
throws IOException {
return Snappy.compress(inDecompressed, outCompressed);
}
+
+ @Override
+ public int maxCompressedSize(int uncompressedSize) {
+ return Snappy.maxCompressedLength(uncompressedSize);
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
index 33c607c..931f969 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
@@ -40,4 +40,9 @@ public class ZstandardCompressor implements ChunkCompressor {
outCompressed.flip();
return compressedSize;
}
+
+ @Override
+ public int maxCompressedSize(int uncompressedSize) {
+ return (int) Zstd.compressBound(uncompressedSize);
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
index 5be72b0..1e92e1f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
@@ -21,14 +21,16 @@ package org.apache.pinot.segment.local.io.writer.impl;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,10 +47,10 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2 =
Integer.BYTES;
private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3 = Long.BYTES;
- protected final FileChannel _dataFile;
- protected ByteBuffer _header;
+ private final File _file;
+ private final FileChannel _dataChannel;
+ private final ByteBuffer _header;
protected final ByteBuffer _chunkBuffer;
- protected final ByteBuffer _compressedBuffer;
protected final ChunkCompressor _chunkCompressor;
protected int _chunkSize;
@@ -66,19 +68,21 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
* @param chunkSize Size of chunk
* @param sizeOfEntry Size of entry (in bytes), max size for variable byte
implementation.
* @param version version of File
- * @throws FileNotFoundException
+ * @throws IOException if the file isn't found or can't be mapped
*/
protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version)
- throws FileNotFoundException {
+ throws IOException {
Preconditions.checkArgument(version == DEFAULT_VERSION || version ==
CURRENT_VERSION);
+ _file = file;
+ _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
+ _dataOffset = headerSize(totalDocs, numDocsPerChunk,
_headerEntryChunkOffsetSize);
_chunkSize = chunkSize;
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
- _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
- _dataOffset = writeHeader(compressionType, totalDocs, numDocsPerChunk,
sizeOfEntry, version);
_chunkBuffer = ByteBuffer.allocateDirect(chunkSize);
- _compressedBuffer = ByteBuffer.allocateDirect(chunkSize * 2);
- _dataFile = new RandomAccessFile(file, "rw").getChannel();
+ _dataChannel = new RandomAccessFile(file, "rw").getChannel();
+ _header = _dataChannel.map(FileChannel.MapMode.READ_WRITE, 0, _dataOffset);
+ writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry,
version);
}
public static int getHeaderEntryChunkOffsetSize(int version) {
@@ -102,10 +106,13 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
writeChunk();
}
- // Write the header and close the file.
- _header.flip();
- _dataFile.write(_header, 0);
- _dataFile.close();
+ if (CleanerUtil.UNMAP_SUPPORTED) {
+ CleanerUtil.getCleaner().freeBuffer(_header);
+ }
+
+ // we will have overmapped by (maxCompressedSize - actualCompressedSize)
for the most recent chunk
+ _dataChannel.truncate(_dataOffset);
+ _dataChannel.close();
}
/**
@@ -116,14 +123,10 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
* @param numDocsPerChunk Number of documents per chunk
* @param sizeOfEntry Size of each entry
* @param version Version of file
- * @return Size of header
*/
- private int writeHeader(ChunkCompressionType compressionType, int totalDocs,
int numDocsPerChunk, int sizeOfEntry,
+ private void writeHeader(ChunkCompressionType compressionType, int
totalDocs, int numDocsPerChunk, int sizeOfEntry,
int version) {
int numChunks = (totalDocs + numDocsPerChunk - 1) / numDocsPerChunk;
- int headerSize = (7 * Integer.BYTES) + (numChunks *
_headerEntryChunkOffsetSize);
-
- _header = ByteBuffer.allocateDirect(headerSize);
int offset = 0;
_header.putInt(version);
@@ -151,8 +154,11 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
int dataHeaderStart = offset + Integer.BYTES;
_header.putInt(dataHeaderStart);
}
+ }
- return headerSize;
+ private static int headerSize(int totalDocs, int numDocsPerChunk, int
headerEntryChunkOffsetSize) {
+ int numChunks = (totalDocs + numDocsPerChunk - 1) / numDocsPerChunk;
+ return (7 * Integer.BYTES) + (numChunks * headerEntryChunkOffsetSize);
}
/**
@@ -166,13 +172,15 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
*
*/
protected void writeChunk() {
- int sizeToWrite;
+ int sizeWritten;
_chunkBuffer.flip();
- try {
- sizeToWrite = _chunkCompressor.compress(_chunkBuffer, _compressedBuffer);
- _dataFile.write(_compressedBuffer, _dataOffset);
- _compressedBuffer.clear();
+ int maxCompressedSize =
_chunkCompressor.maxCompressedSize(_chunkBuffer.limit());
+ // compress directly in to the mapped output rather keep a large buffer to
compress into
+ try (PinotDataBuffer compressedBuffer = PinotDataBuffer.mapFile(_file,
false, _dataOffset,
+ maxCompressedSize, ByteOrder.BIG_ENDIAN, "forward index chunk")) {
+ ByteBuffer view = compressedBuffer.toDirectByteBuffer(0,
maxCompressedSize);
+ sizeWritten = _chunkCompressor.compress(_chunkBuffer, view);
} catch (IOException e) {
LOGGER.error("Exception caught while compressing/writing data chunk", e);
throw new RuntimeException(e);
@@ -184,7 +192,7 @@ public abstract class BaseChunkSVForwardIndexWriter
implements Closeable {
_header.putLong(_dataOffset);
}
- _dataOffset += sizeToWrite;
+ _dataOffset += sizeWritten;
_chunkBuffer.clear();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
index 359c48e..8d9ad7e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.io.writer.impl;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -65,10 +66,11 @@ public class FixedByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexW
* @param sizeOfEntry Size of entry (in bytes)
* @param writerVersion writer format version
* @throws FileNotFoundException Throws {@link FileNotFoundException} if the
specified file is not found.
+ * @throws IOException Throws {@link IOException} if there are any errors
mapping the underlying ByteBuffer.
*/
public FixedByteChunkSVForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
int numDocsPerChunk, int sizeOfEntry, int writerVersion)
- throws FileNotFoundException {
+ throws IOException {
super(file, compressionType, totalDocs, numDocsPerChunk, (sizeOfEntry *
numDocsPerChunk), sizeOfEntry,
writerVersion);
_chunkDataOffset = 0;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
index c06e528..fed1200 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
@@ -54,6 +54,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
*/
@NotThreadSafe
public class VarByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexWriter {
+
public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
private final int _chunkHeaderSize;
@@ -69,11 +70,13 @@ public class VarByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexWri
* @param numDocsPerChunk Number of documents per chunk.
* @param lengthOfLongestEntry Length of longest entry (in bytes)
* @param writerVersion writer format version
- * @throws FileNotFoundException Throws {@link FileNotFoundException} if the
specified file is not found.
+ * @throws FileNotFoundException Throws {@link FileNotFoundException} if the
specified file is
+ * not found.
*/
- public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType
compressionType, int totalDocs,
+ public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType
compressionType,
+ int totalDocs,
int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion)
- throws FileNotFoundException {
+ throws IOException {
super(file, compressionType, totalDocs, numDocsPerChunk,
numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE +
lengthOfLongestEntry),
// chunkSize
@@ -96,25 +99,66 @@ public class VarByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexWri
_chunkBuffer.put(value);
_chunkDataOffSet += value.length;
- // If buffer filled, then compress and write to file.
- if (_chunkHeaderOffset == _chunkHeaderSize) {
- writeChunk();
+ writeChunkIfNecessary();
+ }
+
+ // Note: some duplication is tolerated between these overloads for the sake
of memory efficiency
+
+ public void putStrings(String[] values) {
+ // the entire String[] will be encoded as a single string, write the
header here
+ _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
+ _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ // write all the strings into the data buffer as if it's a single string,
+ // but with its own embedded header so offsets to strings within the body
+ // can be located
+ int headerPosition = _chunkDataOffSet;
+ int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+ int bodyPosition = headerPosition + headerSize;
+ _chunkBuffer.position(bodyPosition);
+ int bodySize = 0;
+ for (int i = 0, h = headerPosition + Integer.BYTES; i < values.length;
i++, h += Integer.BYTES) {
+ byte[] utf8 = values[i].getBytes(UTF_8);
+ _chunkBuffer.putInt(h, utf8.length);
+ _chunkBuffer.put(utf8);
+ bodySize += utf8.length;
}
+ _chunkDataOffSet += headerSize + bodySize;
+ // go back to write the number of strings embedded in the big string
+ _chunkBuffer.putInt(headerPosition, values.length);
+
+ writeChunkIfNecessary();
}
- @Override
- public void close()
- throws IOException {
+ public void putByteArrays(byte[][] values) {
+ // the entire byte[][] will be encoded as a single string, write the
header here
+ _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
+ _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ // write all the byte[]s into the data buffer as if it's a single byte[],
+ // but with its own embedded header so offsets to byte[]s within the body
+ // can be located
+ int headerPosition = _chunkDataOffSet;
+ int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+ int bodyPosition = headerPosition + headerSize;
+ _chunkBuffer.position(bodyPosition);
+ int bodySize = 0;
+ for (int i = 0, h = headerPosition + Integer.BYTES; i < values.length;
i++, h += Integer.BYTES) {
+ byte[] utf8 = values[i];
+ _chunkBuffer.putInt(h, utf8.length);
+ _chunkBuffer.put(utf8);
+ bodySize += utf8.length;
+ }
+ _chunkDataOffSet += headerSize + bodySize;
+ // go back to write the number of byte[]s embedded in the big byte[]
+ _chunkBuffer.putInt(headerPosition, values.length);
+
+ writeChunkIfNecessary();
+ }
- // Write the chunk if it is non-empty.
- if (_chunkBuffer.position() > 0) {
+ private void writeChunkIfNecessary() {
+ // If buffer filled, then compress and write to file.
+ if (_chunkHeaderOffset == _chunkHeaderSize) {
writeChunk();
}
-
- // Write the header and close the file.
- _header.flip();
- _dataFile.write(_header, 0);
- _dataFile.close();
}
/**
@@ -125,7 +169,6 @@ public class VarByteChunkSVForwardIndexWriter extends
BaseChunkSVForwardIndexWri
* <li> Updates the header with the current chunks offset. </li>
* <li> Clears up the buffers, so that they can be reused. </li>
* </ul>
- *
*/
protected void writeChunk() {
// For partially filled chunks, we still need to clear the offsets for
remaining rows, as we reuse this buffer.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 2829e8e..dcc0ea2 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,7 +35,9 @@ import
org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.utils.FileUtils;
import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
@@ -228,10 +231,6 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
}
} else {
// Create raw index
-
- // TODO: add support to multi-value column and inverted index
- Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot
create raw index for multi-value column: %s",
- columnName);
Preconditions.checkState(!invertedIndexColumns.contains(columnName),
"Cannot create inverted index for raw index column: %s",
columnName);
@@ -241,9 +240,16 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
boolean deriveNumDocsPerChunk =
shouldDeriveNumDocsPerChunk(columnName,
segmentCreationSpec.getColumnProperties());
int writerVersion = rawIndexWriterVersion(columnName,
segmentCreationSpec.getColumnProperties());
- _forwardIndexCreatorMap.put(columnName,
- getRawIndexCreatorForColumn(_indexDir, compressionType,
columnName, storedType, _totalDocs,
- indexCreationInfo.getLengthOfLongestEntry(),
deriveNumDocsPerChunk, writerVersion));
+ if (fieldSpec.isSingleValueField()) {
+ _forwardIndexCreatorMap.put(columnName,
+ getRawIndexCreatorForSVColumn(_indexDir, compressionType,
columnName, storedType, _totalDocs,
+ indexCreationInfo.getLengthOfLongestEntry(),
deriveNumDocsPerChunk, writerVersion));
+ } else {
+ _forwardIndexCreatorMap.put(columnName,
+ getRawIndexCreatorForMVColumn(_indexDir, compressionType,
columnName, storedType, _totalDocs,
+ indexCreationInfo.getMaxNumberOfMultiValueElements(),
deriveNumDocsPerChunk, writerVersion,
+ indexCreationInfo.getMaxRowLengthInBytes()));
+ }
}
if (textIndexColumns.contains(columnName)) {
@@ -366,10 +372,6 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
String column = spec.getName();
if (config.getRawIndexCreationColumns().contains(column) ||
config.getRawIndexCompressionType()
.containsKey(column)) {
- if (!spec.isSingleValueField()) {
- throw new RuntimeException(
- "Creation of indices without dictionaries is supported for single
valued columns only.");
- }
return false;
}
return info.isCreateDictionary();
@@ -387,16 +389,19 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
throw new RuntimeException("Null value for column:" + columnName);
}
- boolean isSingleValue =
_schema.getFieldSpecFor(columnName).isSingleValueField();
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+
+ //get dictionaryCreator, will be null if column is not dictionaryEncoded
SegmentDictionaryCreator dictionaryCreator =
_dictionaryCreatorMap.get(columnName);
- if (isSingleValue) {
- // SV column
- // text-index enabled SV column
- TextIndexCreator textIndexCreator =
_textIndexCreatorMap.get(columnName);
- if (textIndexCreator != null) {
- textIndexCreator.add((String) columnValueToIndex);
- }
+ // text-index
+ TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
+ if (textIndexCreator != null) {
+ textIndexCreator.add((String) columnValueToIndex);
+ }
+
+ if (fieldSpec.isSingleValueField()) {
+ // Single Value column
JsonIndexCreator jsonIndexCreator =
_jsonIndexCreatorMap.get(columnName);
if (jsonIndexCreator != null) {
jsonIndexCreator.add((String) columnValueToIndex);
@@ -452,12 +457,107 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
}
}
} else {
- // MV column (always dictionary encoded)
- int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
- forwardIndexCreator.putDictIdMV(dictIds);
- DictionaryBasedInvertedIndexCreator invertedIndexCreator =
_invertedIndexCreatorMap.get(columnName);
- if (invertedIndexCreator != null) {
- invertedIndexCreator.add(dictIds, dictIds.length);
+ if (dictionaryCreator != null) {
+ //dictionary encoded
+ int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
+ forwardIndexCreator.putDictIdMV(dictIds);
+ DictionaryBasedInvertedIndexCreator invertedIndexCreator =
_invertedIndexCreatorMap
+ .get(columnName);
+ if (invertedIndexCreator != null) {
+ invertedIndexCreator.add(dictIds, dictIds.length);
+ }
+ } else {
+ // for text index on raw columns, check the config to determine if
actual raw value should
+ // be stored or not
+ if (textIndexCreator != null &&
!shouldStoreRawValueForTextIndex(columnName)) {
+ Object value = _columnProperties.get(columnName)
+ .get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+ if (value == null) {
+ value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
+ }
+ if (forwardIndexCreator.getValueType().getStoredType() ==
DataType.STRING) {
+ value = String.valueOf(value);
+ int length = ((String[]) columnValueToIndex).length;
+ columnValueToIndex = new String[length];
+ Arrays.fill((String[]) columnValueToIndex, value);
+ } else if (forwardIndexCreator.getValueType().getStoredType() ==
DataType.BYTES) {
+ int length = ((byte[][]) columnValueToIndex).length;
+ columnValueToIndex = new byte[length][];
+ Arrays.fill((byte[][]) columnValueToIndex,
String.valueOf(value).getBytes());
+ } else {
+ throw new RuntimeException("Text Index is only supported for
STRING and BYTES stored type");
+ }
+ }
+ switch (forwardIndexCreator.getValueType()) {
+ case INT:
+ if (columnValueToIndex instanceof int[]) {
+ forwardIndexCreator.putIntMV((int[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ int[] array = new int[((Object[]) columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (Integer) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putIntMV(array);
+ }
+ break;
+ case LONG:
+ if (columnValueToIndex instanceof long[]) {
+ forwardIndexCreator.putLongMV((long[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ long[] array = new long[((Object[])
columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (Long) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putLongMV(array);
+ }
+ break;
+ case FLOAT:
+ if (columnValueToIndex instanceof float[]) {
+ forwardIndexCreator.putFloatMV((float[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ float[] array = new float[((Object[])
columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (Float) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putFloatMV(array);
+ }
+ break;
+ case DOUBLE:
+ if (columnValueToIndex instanceof double[]) {
+ forwardIndexCreator.putDoubleMV((double[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ double[] array = new double[((Object[])
columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (Double) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putDoubleMV(array);
+ }
+ break;
+ case STRING:
+ if (columnValueToIndex instanceof String[]) {
+ forwardIndexCreator.putStringMV((String[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ String[] array = new String[((Object[])
columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (String) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putStringMV(array);
+ }
+ break;
+ case BYTES:
+ if (columnValueToIndex instanceof byte[][]) {
+ forwardIndexCreator.putBytesMV((byte[][]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ byte[][] array = new byte[((Object[])
columnValueToIndex).length][];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (byte[]) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putBytesMV(array);
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
}
}
@@ -734,10 +834,11 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
* @param deriveNumDocsPerChunk true if varbyte writer should auto-derive
the number of rows per chunk
* @param writerVersion version to use for the raw index writer
* @return raw index creator
- * @throws IOException
*/
- public static ForwardIndexCreator getRawIndexCreatorForColumn(File file,
ChunkCompressionType compressionType,
- String column, DataType dataType, int totalDocs, int
lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
+ public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file,
+ ChunkCompressionType compressionType,
+ String column, DataType dataType, int totalDocs, int
lengthOfLongestEntry,
+ boolean deriveNumDocsPerChunk,
int writerVersion)
throws IOException {
switch (dataType.getStoredType()) {
@@ -756,6 +857,41 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
}
}
+ /**
+ * Helper method to build the raw index creator for the column.
+ * Assumes that column to be indexed is single valued.
+ *
+ * @param file Output index file
+ * @param column Column name
+ * @param totalDocs Total number of documents to index
+ * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive
the number of rows
+ * per chunk
+ * @param writerVersion version to use for the raw index writer
+ * @param maxRowLengthInBytes the length of the longest row in bytes
+ * @return raw index creator
+ */
+ public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file,
ChunkCompressionType compressionType,
+ String column, DataType dataType, final int totalDocs,
+ final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
int writerVersion,
+ int maxRowLengthInBytes)
+ throws IOException {
+ switch (dataType.getStoredType()) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return new MultiValueFixedByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType,
+ dataType.getStoredType().size(), maxNumberOfMultiValueElements,
deriveNumDocsPerChunk, writerVersion);
+ case STRING:
+ case BYTES:
+ return new MultiValueVarByteRawIndexCreator(file, compressionType,
column, totalDocs, dataType, writerVersion,
+ maxRowLengthInBytes);
+ default:
+ throw new UnsupportedOperationException(
+ "Data type not supported for raw indexing: " + dataType);
+ }
+ }
+
@Override
public void close()
throws IOException {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
new file mode 100644
index 0000000..572c793
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column
of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator
{
+
+ private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
+ private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+ private final VarByteChunkSVForwardIndexWriter _indexWriter;
+ private final DataType _valueType;
+
+ /**
+ * Create a var-byte raw index creator for the given column
+ *
+ * @param baseIndexDir Index directory
+ * @param compressionType Type of compression to use
+ * @param column Name of column to index
+ * @param totalDocs Total number of documents to index
+ * @param valueType Type of the values
+ */
+ public MultiValueFixedByteRawIndexCreator(File baseIndexDir,
ChunkCompressionType compressionType,
+ String column,
+ int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
+ final int maxNumberOfMultiValueElements)
+ throws IOException {
+ this(baseIndexDir, compressionType, column, totalDocs, valueType,
maxLengthOfEachEntry,
+ maxNumberOfMultiValueElements, false,
+ BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ }
+
+ /**
+ * Create a var-byte raw index creator for the given column
+ *
+ * @param baseIndexDir Index directory
+ * @param compressionType Type of compression to use
+ * @param column Name of column to index
+ * @param totalDocs Total number of documents to index
+ * @param valueType Type of the values
+ * @param maxLengthOfEachEntry length of longest entry (in bytes)
+ * @param deriveNumDocsPerChunk true if writer should auto-derive the number
of rows per chunk
+ * @param writerVersion writer format version
+ */
+ public MultiValueFixedByteRawIndexCreator(File baseIndexDir,
ChunkCompressionType compressionType,
+ String column, int totalDocs, DataType valueType, final int
maxLengthOfEachEntry,
+ final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
+ int writerVersion)
+ throws IOException {
+ File file = new File(baseIndexDir,
+ column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ FileUtils.deleteQuietly(file);
+ int totalMaxLength = maxNumberOfMultiValueElements * maxLengthOfEachEntry;
+ int numDocsPerChunk =
+ deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) :
DEFAULT_NUM_DOCS_PER_CHUNK;
+ _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType,
totalDocs,
+ numDocsPerChunk, totalMaxLength, writerVersion);
+ _valueType = valueType;
+ }
+
+ @VisibleForTesting
+ public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+ int overheadPerEntry =
+ lengthOfLongestEntry +
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return false;
+ }
+
+ @Override
+ public DataType getValueType() {
+ return _valueType;
+ }
+
+ @Override
+ public void putIntMV(final int[] values) {
+
+ byte[] bytes = new byte[Integer.BYTES
+ + values.length * Integer.BYTES]; //numValues, bytes required to store
the content
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the content of each element
+ for (final int value : values) {
+ byteBuffer.putInt(value);
+ }
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void putLongMV(final long[] values) {
+
+ byte[] bytes = new byte[Integer.BYTES
+ + values.length * Long.BYTES]; //numValues, bytes required to store
the content
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the content of each element
+ for (final long value : values) {
+ byteBuffer.putLong(value);
+ }
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void putFloatMV(final float[] values) {
+
+ byte[] bytes = new byte[Integer.BYTES
+ + values.length * Float.BYTES]; //numValues, bytes required to store
the content
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the content of each element
+ for (final float value : values) {
+ byteBuffer.putFloat(value);
+ }
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void putDoubleMV(final double[] values) {
+
+ byte[] bytes = new byte[Integer.BYTES
+ + values.length * Long.BYTES]; //numValues, bytes required to store
the content
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the content of each element
+ for (final double value : values) {
+ byteBuffer.putDouble(value);
+ }
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _indexWriter.close();
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
new file mode 100644
index 0000000..5d5b3cf
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import java.io.File;
+import java.io.IOException;
+import
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column
of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
+
+ private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+ private final VarByteChunkSVForwardIndexWriter _indexWriter;
+ private final DataType _valueType;
+
+ /**
+ * Create a var-byte raw index creator for the given column
+ *
+ * @param baseIndexDir Index directory
+ * @param compressionType Type of compression to use
+ * @param column Name of column to index
+ * @param totalDocs Total number of documents to index
+ * @param valueType Type of the values
+ * @param maxRowLengthInBytes the length in bytes of the largest row
+ */
+ public MultiValueVarByteRawIndexCreator(File baseIndexDir,
ChunkCompressionType compressionType,
+ String column,
+ int totalDocs, DataType valueType, int maxRowLengthInBytes)
+ throws IOException {
+ this(baseIndexDir, compressionType, column, totalDocs, valueType,
+ BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes);
+ }
+
+ /**
+ * Create a var-byte raw index creator for the given column
+ *
+ * @param baseIndexDir Index directory
+ * @param compressionType Type of compression to use
+ * @param column Name of column to index
+ * @param totalDocs Total number of documents to index
+ * @param valueType Type of the values
+ * @param maxRowLengthInBytes the size in bytes of the largest row, the
chunk size cannot be smaller than this
+ * @param writerVersion writer format version
+ */
+ public MultiValueVarByteRawIndexCreator(File baseIndexDir,
ChunkCompressionType compressionType,
+ String column, int totalDocs, DataType valueType, int writerVersion, int
maxRowLengthInBytes)
+ throws IOException {
+ //we will prepend the actual content with numElements and length array
containing length of each element
+ int totalMaxLength = Integer.BYTES + Math.max(maxRowLengthInBytes,
TARGET_MAX_CHUNK_SIZE);
+ File file = new File(baseIndexDir,
+ column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ int numDocsPerChunk = getNumDocsPerChunk(totalMaxLength);
+ _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType,
totalDocs,
+ numDocsPerChunk, totalMaxLength,
+ writerVersion);
+ _valueType = valueType;
+ }
+
+ private static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+ int overheadPerEntry =
+ lengthOfLongestEntry +
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return false;
+ }
+
+ @Override
+ public DataType getValueType() {
+ return _valueType;
+ }
+
+ @Override
+ public void putStringMV(final String[] values) {
+ _indexWriter.putStrings(values);
+ }
+
+ @Override
+ public void putBytesMV(final byte[][] values) {
+ _indexWriter.putByteArrays(values);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _indexWriter.close();
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 284bf69..6407b55 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -47,6 +47,7 @@ public abstract class AbstractColumnStatisticsCollector
implements ColumnStatist
protected int _totalNumberOfEntries = 0;
protected int _maxNumberOfMultiValues = 0;
+ protected int _maxLengthOfMultiValues = 0;
private PartitionFunction _partitionFunction;
private final int _numPartitions;
private final Set<Integer> _partitions;
@@ -72,6 +73,10 @@ public abstract class AbstractColumnStatisticsCollector
implements ColumnStatist
return _maxNumberOfMultiValues;
}
+ public int getMaxLengthOfMultiValues() {
+ return _maxLengthOfMultiValues;
+ }
+
void addressSorted(Object entry) {
if (_isSorted) {
if (_previousValue != null) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
index a0cfd66..411238d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
@@ -33,6 +33,7 @@ public class BytesColumnPredIndexStatsCollector extends
AbstractColumnStatistics
private int _minLength = Integer.MAX_VALUE;
private int _maxLength = 0;
+ private int _maxRowLength = 0;
private ByteArray[] _sortedValues;
private boolean _sealed = false;
@@ -42,16 +43,32 @@ public class BytesColumnPredIndexStatsCollector extends
AbstractColumnStatistics
@Override
public void collect(Object entry) {
- ByteArray value = new ByteArray((byte[]) entry);
- addressSorted(value);
- updatePartition(value);
- _values.add(value);
-
- int length = value.length();
- _minLength = Math.min(_minLength, length);
- _maxLength = Math.max(_maxLength, length);
-
- _totalNumberOfEntries++;
+ if (entry instanceof Object[]) {
+ Object[] values = (Object[]) entry;
+ int rowLength = 0;
+ for (Object obj : values) {
+ ByteArray value = new ByteArray((byte[]) obj);
+ _values.add(value);
+ int length = value.length();
+ _minLength = Math.min(_minLength, length);
+ _maxLength = Math.max(_maxLength, length);
+ rowLength += length;
+ }
+ _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues,
values.length);
+ _maxRowLength = Math.max(_maxRowLength, rowLength);
+ updateTotalNumberOfEntries(values);
+ } else {
+ ByteArray value = new ByteArray((byte[]) entry);
+ addressSorted(value);
+ updatePartition(value);
+ _values.add(value);
+
+ int length = value.length();
+ _minLength = Math.min(_minLength, length);
+ _maxLength = Math.max(_maxLength, length);
+ _maxRowLength = _maxLength;
+ _totalNumberOfEntries++;
+ }
}
@Override
@@ -92,6 +109,11 @@ public class BytesColumnPredIndexStatsCollector extends
AbstractColumnStatistics
}
@Override
+ public int getMaxRowLengthInBytes() {
+ return _maxRowLength;
+ }
+
+ @Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
@@ -106,7 +128,7 @@ public class BytesColumnPredIndexStatsCollector extends
AbstractColumnStatistics
@Override
public void seal() {
- _sortedValues = _values.toArray(new ByteArray[_values.size()]);
+ _sortedValues = _values.toArray(new ByteArray[0]);
Arrays.sort(_sortedValues);
_sealed = true;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
index 068a11e..1350677 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
@@ -31,6 +31,7 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
private int _minLength = Integer.MAX_VALUE;
private int _maxLength = 0;
+ private int _maxRowLength = 0;
private String[] _sortedValues;
private boolean _sealed = false;
@@ -42,6 +43,7 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
public void collect(Object entry) {
if (entry instanceof Object[]) {
Object[] values = (Object[]) entry;
+ int rowLength = 0;
for (Object obj : values) {
String value = (String) obj;
_values.add(value);
@@ -49,9 +51,11 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
int length = value.getBytes(UTF_8).length;
_minLength = Math.min(_minLength, length);
_maxLength = Math.max(_maxLength, length);
+ rowLength += length;
}
_maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues,
values.length);
+ _maxRowLength = Math.max(_maxRowLength, rowLength);
updateTotalNumberOfEntries(values);
} else {
String value = (String) entry;
@@ -62,6 +66,7 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
int valueLength = value.getBytes(UTF_8).length;
_minLength = Math.min(_minLength, valueLength);
_maxLength = Math.max(_maxLength, valueLength);
+ _maxRowLength = _maxLength;
_totalNumberOfEntries++;
}
@@ -100,6 +105,11 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
}
@Override
+ public int getMaxRowLengthInBytes() {
+ return _maxRowLength;
+ }
+
+ @Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
new file mode 100644
index 0000000..1844957
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader
for values of
+ * variable
+ * length data type
+ * (STRING, BYTES).
+ * <p>For data layout, please refer to the documentation for {@link
VarByteChunkSVForwardIndexWriter}
+ */
+public final class VarByteChunkMVForwardIndexReader extends
BaseChunkSVForwardIndexReader {
+
+ private static final int ROW_OFFSET_SIZE =
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
+ private final int _maxChunkSize;
+
+ public VarByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType
valueType) {
+ super(dataBuffer, valueType);
+ _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE +
_lengthOfLongestEntry);
+ }
+
+ @Nullable
+ @Override
+ public ChunkReaderContext createContext() {
+ if (_isCompressed) {
+ return new ChunkReaderContext(_maxChunkSize);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int getStringMV(final int docId, final String[] valueBuffer,
+ final ChunkReaderContext context) {
+ byte[] compressedBytes;
+ if (_isCompressed) {
+ compressedBytes = getBytesCompressed(docId, context);
+ } else {
+ compressedBytes = getBytesUncompressed(docId);
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+ int numValues = byteBuffer.getInt();
+ int contentOffset = (numValues + 1) * Integer.BYTES;
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.position(contentOffset);
+ byteBuffer.get(bytes, 0, length);
+ valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
+ contentOffset += length;
+ }
+ return numValues;
+ }
+
+ @Override
+ public int getBytesMV(final int docId, final byte[][] valueBuffer,
+ final ChunkReaderContext context) {
+ byte[] compressedBytes;
+ if (_isCompressed) {
+ compressedBytes = getBytesCompressed(docId, context);
+ } else {
+ compressedBytes = getBytesUncompressed(docId);
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+ int numValues = byteBuffer.getInt();
+ int contentOffset = (numValues + 1) * Integer.BYTES;
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.position(contentOffset);
+ byteBuffer.get(bytes, 0, length);
+ valueBuffer[i] = bytes;
+ contentOffset += length;
+ }
+ return numValues;
+ }
+
+ @Override
+ public byte[] getBytes(int docId, ChunkReaderContext context) {
+ if (_isCompressed) {
+ return getBytesCompressed(docId, context);
+ } else {
+ return getBytesUncompressed(docId);
+ }
+ }
+
+ /**
+ * Helper method to read BYTES value from the compressed index.
+ */
+ private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
+ int chunkRowId = docId % _numDocsPerChunk;
+ ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+
+ // These offsets are offset in the chunk buffer
+ int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+ int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
+
+ byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+ chunkBuffer.position(valueStartOffset);
+ chunkBuffer.get(bytes);
+ return bytes;
+ }
+
+ /**
+ * Helper method to read BYTES value from the uncompressed index.
+ */
+ private byte[] getBytesUncompressed(int docId) {
+ int chunkId = docId / _numDocsPerChunk;
+ int chunkRowId = docId % _numDocsPerChunk;
+
+ // These offsets are offset in the data buffer
+ long chunkStartOffset = getChunkPosition(chunkId);
+ long valueStartOffset =
+ chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long)
chunkRowId * ROW_OFFSET_SIZE);
+ long valueEndOffset = getValueEndOffset(chunkId, chunkRowId,
chunkStartOffset);
+
+ byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+ _dataBuffer.copyTo(valueStartOffset, bytes);
+ return bytes;
+ }
+
+ /**
+ * Helper method to compute the end offset of the value in the chunk buffer.
+ */
+ private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+ if (rowId == _numDocsPerChunk - 1) {
+ // Last row in the chunk
+ return chunkBuffer.limit();
+ } else {
+ int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+ if (valueEndOffset == 0) {
+ // Last row in the last chunk (chunk is incomplete, which stores 0 as
the offset for the absent rows)
+ return chunkBuffer.limit();
+ } else {
+ return valueEndOffset;
+ }
+ }
+ }
+
+ /**
+ * Helper method to compute the end offset of the value in the data buffer.
+ */
+ private long getValueEndOffset(int chunkId, int chunkRowId, long
chunkStartOffset) {
+ if (chunkId == _numChunks - 1) {
+ // Last chunk
+ if (chunkRowId == _numDocsPerChunk - 1) {
+ // Last row in the last chunk
+ return _dataBuffer.size();
+ } else {
+ int valueEndOffsetInChunk = _dataBuffer
+ .getInt(chunkStartOffset + (long) (chunkRowId + 1) *
ROW_OFFSET_SIZE);
+ if (valueEndOffsetInChunk == 0) {
+ // Last row in the last chunk (chunk is incomplete, which stores 0
as the offset for the absent rows)
+ return _dataBuffer.size();
+ } else {
+ return chunkStartOffset + valueEndOffsetInChunk;
+ }
+ }
+ } else {
+ if (chunkRowId == _numDocsPerChunk - 1) {
+ // Last row in the chunk
+ return getChunkPosition(chunkId + 1);
+ } else {
+ return chunkStartOffset + _dataBuffer
+ .getInt(chunkStartOffset + (long) (chunkRowId + 1) *
ROW_OFFSET_SIZE);
+ }
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
index aba55ca..15b02d6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
@@ -167,7 +167,11 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
fileExtension =
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
}
} else {
- fileExtension =
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+ if (!columnMetadata.hasDictionary()) {
+ fileExtension =
V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+ } else {
+ fileExtension =
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+ }
}
break;
case INVERTED_INDEX:
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
new file mode 100644
index 0000000..a1f6e2c
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiValueVarByteRawIndexCreatorTest {
+
+ private static final String OUTPUT_DIR =
+ System.getProperty("java.io.tmpdir") + File.separator + "mvVarRawTest";
+
+ @BeforeClass
+ public void setup() throws Exception {
+ FileUtils.forceMkdir(new File(OUTPUT_DIR));
+ }
+
+ /**
+ * Clean up after test
+ */
+ @AfterClass
+ public void cleanup() {
+ FileUtils.deleteQuietly(new File(OUTPUT_DIR));
+ }
+
+ @Test
+ public void testMVString() throws IOException {
+ String column = "testCol";
+ int numDocs = 1000;
+ int maxElements = 50;
+ int maxTotalLength = 500;
+ File file = new File(OUTPUT_DIR, column +
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ file.delete();
+ MultiValueVarByteRawIndexCreator creator = new
MultiValueVarByteRawIndexCreator(
+ new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs,
DataType.STRING, maxTotalLength);
+ List<String[]> inputs = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < numDocs; i++) {
+ //int length = 1;
+ int length = random.nextInt(10);
+ String[] values = new String[length];
+ for (int j = 0; j < length; j++) {
+ char[] value = new char[length];
+ Arrays.fill(value, 'a');
+ values[j] = new String(value);
+ }
+ inputs.add(values);
+ creator.putStringMV(values);
+ }
+ creator.close();
+
+ //read
+ final PinotDataBuffer buffer = PinotDataBuffer
+ .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+ VarByteChunkMVForwardIndexReader reader = new
VarByteChunkMVForwardIndexReader(buffer,
+ DataType.STRING);
+ final ChunkReaderContext context = reader.createContext();
+ String[] values = new String[maxElements];
+ for (int i = 0; i < numDocs; i++) {
+ int length = reader.getStringMV(i, values, context);
+ String[] readValue = Arrays.copyOf(values, length);
+ Assert.assertEquals(inputs.get(i), readValue);
+ }
+ }
+
+ @Test
+ public void testMVBytes() throws IOException {
+ String column = "testCol";
+ int numDocs = 1000;
+ int maxElements = 50;
+ int maxTotalLength = 500;
+ File file = new File(OUTPUT_DIR, column +
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ file.delete();
+ MultiValueVarByteRawIndexCreator creator = new
MultiValueVarByteRawIndexCreator(
+ new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs,
DataType.BYTES,
+ maxTotalLength);
+ List<byte[][]> inputs = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < numDocs; i++) {
+ //int length = 1;
+ int length = random.nextInt(10);
+ byte[][] values = new byte[length][];
+ for (int j = 0; j < length; j++) {
+ char[] value = new char[length];
+ Arrays.fill(value, 'a');
+ values[j] = new String(value).getBytes();
+ }
+ inputs.add(values);
+ creator.putBytesMV(values);
+ }
+ creator.close();
+
+ //read
+ final PinotDataBuffer buffer = PinotDataBuffer
+ .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+ VarByteChunkMVForwardIndexReader reader = new
VarByteChunkMVForwardIndexReader(buffer,
+ DataType.BYTES);
+ final ChunkReaderContext context = reader.createContext();
+ byte[][] values = new byte[maxElements][];
+ for (int i = 0; i < numDocs; i++) {
+ int length = reader.getBytesMV(i, values, context);
+ byte[][] readValue = Arrays.copyOf(values, length);
+ for (int j = 0; j < length; j++) {
+ Assert.assertTrue(Arrays.equals(inputs.get(i)[j], readValue[j]));
+ }
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
index 9f515e8..d8cafdc 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.creator;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,6 +33,7 @@ import
org.apache.pinot.segment.local.loader.LocalSegmentDirectoryLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -60,6 +62,7 @@ import org.testng.annotations.Test;
* Class for testing Raw index creators.
*/
public class RawIndexCreatorTest {
+
private static final int NUM_ROWS = 10009;
private static final int MAX_STRING_LENGTH = 101;
@@ -71,6 +74,12 @@ public class RawIndexCreatorTest {
private static final String FLOAT_COLUMN = "floatColumn";
private static final String DOUBLE_COLUMN = "doubleColumn";
private static final String STRING_COLUMN = "stringColumn";
+ private static final String INT_MV_COLUMN = "intMVColumn";
+ private static final String LONG_MV_COLUMN = "longMVColumn";
+ private static final String FLOAT_MV_COLUMN = "floatMVColumn";
+ private static final String DOUBLE_MV_COLUMN = "doubleMVColumn";
+ private static final String STRING_MV_COLUMN = "stringMVColumn";
+ private static final String BYTES_MV_COLUMN = "bytesMVColumn";
Random _random;
private RecordReader _recordReader;
@@ -79,8 +88,6 @@ public class RawIndexCreatorTest {
/**
* Setup to build a segment with raw indexes (no-dictionary) of various data
types.
- *
- * @throws Exception
*/
@BeforeClass
public void setup()
@@ -91,8 +98,15 @@ public class RawIndexCreatorTest {
schema.addField(new DimensionFieldSpec(FLOAT_COLUMN, DataType.FLOAT,
true));
schema.addField(new DimensionFieldSpec(DOUBLE_COLUMN, DataType.DOUBLE,
true));
schema.addField(new DimensionFieldSpec(STRING_COLUMN, DataType.STRING,
true));
+ schema.addField(new DimensionFieldSpec(INT_MV_COLUMN, DataType.INT,
false));
+ schema.addField(new DimensionFieldSpec(LONG_MV_COLUMN, DataType.LONG,
false));
+ schema.addField(new DimensionFieldSpec(FLOAT_MV_COLUMN, DataType.FLOAT,
false));
+ schema.addField(new DimensionFieldSpec(DOUBLE_MV_COLUMN, DataType.DOUBLE,
false));
+ schema.addField(new DimensionFieldSpec(STRING_MV_COLUMN, DataType.STRING,
false));
+ schema.addField(new DimensionFieldSpec(BYTES_MV_COLUMN, DataType.BYTES,
false));
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("test")
+ .build();
_random = new Random(System.nanoTime());
_recordReader = buildIndex(tableConfig, schema);
@@ -109,7 +123,6 @@ public class RawIndexCreatorTest {
/**
* Test for int raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testIntRawIndexCreator()
@@ -120,7 +133,6 @@ public class RawIndexCreatorTest {
/**
* Test for long raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testLongRawIndexCreator()
@@ -131,7 +143,6 @@ public class RawIndexCreatorTest {
/**
* Test for float raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testFloatRawIndexCreator()
@@ -142,7 +153,6 @@ public class RawIndexCreatorTest {
/**
* Test for double raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testDoubleRawIndexCreator()
@@ -153,19 +163,21 @@ public class RawIndexCreatorTest {
/**
* Test for string raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testStringRawIndexCreator()
throws Exception {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN);
- try (VarByteChunkSVForwardIndexReader rawIndexReader = new
VarByteChunkSVForwardIndexReader(indexBuffer,
+ try (VarByteChunkSVForwardIndexReader rawIndexReader = new
VarByteChunkSVForwardIndexReader(
+ indexBuffer,
DataType.STRING);
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
rawIndexReader.createContext()) {
+ BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
rawIndexReader
+ .createContext()) {
_recordReader.rewind();
for (int row = 0; row < NUM_ROWS; row++) {
GenericRow expectedRow = _recordReader.next();
- Assert.assertEquals(rawIndexReader.getString(row, readerContext),
expectedRow.getValue(STRING_COLUMN));
+ Assert.assertEquals(rawIndexReader.getString(row, readerContext),
+ expectedRow.getValue(STRING_COLUMN));
}
}
}
@@ -175,17 +187,88 @@ public class RawIndexCreatorTest {
*
* @param column Column for which to perform the test
* @param dataType Data type of the column
- * @throws Exception
*/
private void testFixedLengthRawIndexCreator(String column, DataType dataType)
throws Exception {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(column);
- try (FixedByteChunkSVForwardIndexReader rawIndexReader = new
FixedByteChunkSVForwardIndexReader(indexBuffer,
- dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext
readerContext = rawIndexReader.createContext()) {
+ try (FixedByteChunkSVForwardIndexReader rawIndexReader = new
FixedByteChunkSVForwardIndexReader(
+ indexBuffer,
+ dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext
readerContext = rawIndexReader
+ .createContext()) {
+ _recordReader.rewind();
+ for (int row = 0; row < NUM_ROWS; row++) {
+ GenericRow expectedRow = _recordReader.next();
+ Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext,
row),
+ expectedRow.getValue(column));
+ }
+ }
+ }
+
+ /**
+ * Test for multi value string raw index creator.
+ * Compares values read from the raw index against expected value.
+ */
+ @Test
+ public void testStringMVRawIndexCreator()
+ throws Exception {
+ PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_MV_COLUMN);
+ try (VarByteChunkMVForwardIndexReader rawIndexReader = new
VarByteChunkMVForwardIndexReader(
+ indexBuffer,
+ DataType.STRING);
+ BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
rawIndexReader
+ .createContext()) {
_recordReader.rewind();
+ int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
+ .getColumnMetadataFor(STRING_MV_COLUMN).getMaxNumberOfMultiValues();
+ final String[] valueBuffer = new String[maxNumberOfMultiValues];
for (int row = 0; row < NUM_ROWS; row++) {
GenericRow expectedRow = _recordReader.next();
- Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext,
row), expectedRow.getValue(column));
+
+ int length = rawIndexReader.getStringMV(row, valueBuffer,
readerContext);
+ String[] readValue = Arrays.copyOf(valueBuffer, length);
+ Object[] writtenValue = (Object[])
expectedRow.getValue(STRING_MV_COLUMN);
+ if (writtenValue == null || writtenValue.length == 0) {
+ writtenValue = new
String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+ }
+ for (int i = 0; i < length; i++) {
+ Object expected = writtenValue[i];
+ Object actual = readValue[i];
+ Assert.assertEquals(expected, actual);
+ }
+ }
+ }
+ }
+
+ /**
+ * Test for multi value string raw index creator.
+ * Compares values read from the raw index against expected value.
+ */
+ @Test
+ public void testBytesMVRawIndexCreator()
+ throws Exception {
+ PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN);
+ try (VarByteChunkMVForwardIndexReader rawIndexReader = new
VarByteChunkMVForwardIndexReader(
+ indexBuffer, DataType.BYTES);
+ BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext =
rawIndexReader
+ .createContext()) {
+ _recordReader.rewind();
+ int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
+ .getColumnMetadataFor(BYTES_MV_COLUMN).getMaxNumberOfMultiValues();
+ final byte[][] valueBuffer = new byte[maxNumberOfMultiValues][];
+ for (int row = 0; row < NUM_ROWS; row++) {
+ GenericRow expectedRow = _recordReader.next();
+
+ int length = rawIndexReader.getBytesMV(row, valueBuffer,
readerContext);
+ byte[][] readValue = Arrays.copyOf(valueBuffer, length);
+ Object[] writtenValue = (Object[])
expectedRow.getValue(BYTES_MV_COLUMN);
+ if (writtenValue == null || writtenValue.length == 0) {
+ writtenValue = new
byte[][]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES};
+ }
+ for (int i = 0; i < length; i++) {
+ Object expected = writtenValue[i];
+ Object actual = readValue[i];
+ Assert.assertTrue(Arrays.equals((byte[]) expected, (byte[]) actual));
+ }
}
}
}
@@ -205,7 +288,6 @@ public class RawIndexCreatorTest {
* Helper method to build a segment containing a single valued string column
with RAW (no-dictionary) index.
*
* @return Array of string values for the rows in the generated index.
- * @throws Exception
*/
private RecordReader buildIndex(TableConfig tableConfig, Schema schema)
throws Exception {
@@ -221,9 +303,17 @@ public class RawIndexCreatorTest {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
Object value;
-
- value = getRandomValue(_random, fieldSpec.getDataType());
- map.put(fieldSpec.getName(), value);
+ if (fieldSpec.isSingleValueField()) {
+ value = getRandomValue(_random, fieldSpec.getDataType());
+ map.put(fieldSpec.getName(), value);
+ } else {
+ int length = _random.nextInt(50);
+ Object[] values = new Object[length];
+ for (int j = 0; j < length; j++) {
+ values[j] = getRandomValue(_random, fieldSpec.getDataType());
+ }
+ map.put(fieldSpec.getName(), values);
+ }
}
GenericRow genericRow = new GenericRow();
@@ -263,8 +353,13 @@ public class RawIndexCreatorTest {
case STRING:
return StringUtil
.sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)),
Integer.MAX_VALUE);
+ case BYTES:
+ return StringUtil
+
.sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)),
Integer.MAX_VALUE)
+ .getBytes();
default:
- throw new UnsupportedOperationException("Unsupported data type for
random value generator: " + dataType);
+ throw new UnsupportedOperationException(
+ "Unsupported data type for random value generator: " + dataType);
}
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 1d1a8a5..744f0bc 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -39,6 +39,7 @@ public class V1Constants {
public static final String UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION =
".sv.unsorted.fwd";
public static final String SORTED_SV_FORWARD_INDEX_FILE_EXTENSION =
".sv.sorted.fwd";
public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION =
".sv.raw.fwd";
+ public static final String RAW_MV_FORWARD_INDEX_FILE_EXTENSION =
".mv.raw.fwd";
public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION =
".mv.fwd";
public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION =
".bitmap.inv";
public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION =
".bitmap.range";
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
index 9415de8..d636fb1 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
@@ -38,4 +38,6 @@ public interface ChunkCompressor {
*/
int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
throws IOException;
+
+ int maxCompressedSize(int uncompressedSize);
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
index da30237..9707107 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
@@ -85,6 +85,10 @@ public class ColumnIndexCreationInfo implements Serializable
{
return _columnStatistics.getMaxNumberOfMultiValues();
}
+ public int getMaxRowLengthInBytes() {
+ return _columnStatistics.getMaxRowLengthInBytes();
+ }
+
public boolean isAutoGenerated() {
return _isAutoGenerated;
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java
index 825c331..70bd584 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java
@@ -83,6 +83,13 @@ public interface ColumnStatistics extends Serializable {
int getMaxNumberOfMultiValues();
/**
+ * @return the length of the largest row in bytes for variable length types
+ */
+ default int getMaxRowLengthInBytes() {
+ return -1;
+ }
+
+ /**
* @return Returns if any of the values have nulls in the segments.
*/
boolean hasNull();
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
index dee4db1..e5a21e9 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
@@ -173,4 +173,13 @@ public interface ForwardIndexCreator extends Closeable {
default void putStringMV(String[] values) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Writes the next byte[] type multi-value into the forward index.
+ *
+ * @param values Values to write
+ */
+ default void putBytesMV(byte[][] values) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index fb92bec..6393aaf 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -242,4 +242,23 @@ public interface ForwardIndexReader<T extends
ForwardIndexReaderContext> extends
default int getStringMV(int docId, String[] valueBuffer, T context) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Reads the bytes type multi-value at the given document id into the passed
in value buffer (the buffer size must
+ * be enough to hold all the values for the multi-value entry) and returns
the number of values within the multi-value
+ * entry.
+ *
+ * @param docId Document id
+ * @param valueBuffer Value buffer
+ * @param context Reader context
+ * @return Number of values within the multi-value entry
+ */
+ default int getBytesMV(int docId, byte[][] valueBuffer, T context) {
+ throw new UnsupportedOperationException();
+ }
+
+ default int getFloatMV(int docId, float[] valueBuffer, T context, int[]
parentIndices) {
+ throw new UnsupportedOperationException();
+ }
+
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index b506106..aa41810 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -309,7 +309,7 @@ public class DictionaryToRawIndexConverter {
int lengthOfLongestEntry = (storedType == DataType.STRING) ?
getLengthOfLongestEntry(dictionary) : -1;
try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
- .getRawIndexCreatorForColumn(newSegment, compressionType, column,
storedType, numDocs, lengthOfLongestEntry,
+ .getRawIndexCreatorForSVColumn(newSegment, compressionType, column,
storedType, numDocs, lengthOfLongestEntry,
false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
ForwardIndexReaderContext readerContext = reader.createContext()) {
switch (storedType) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]