This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4d512a9057 Fix the missing getNumValuesMV() in raw forward index v4
and v5 (#15191)
4d512a9057 is described below
commit 4d512a905721bdc019a20a882ec0e8df12d0ebc6
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 4 17:48:44 2025 -0700
Fix the missing getNumValuesMV() in raw forward index v4 and v5 (#15191)
---
.../forward/VarByteChunkForwardIndexReaderV4.java | 5 ++
.../forward/VarByteChunkForwardIndexReaderV5.java | 12 ++++
.../MultiValueFixedByteRawIndexCreatorTest.java | 74 ++++++++++------------
.../MultiValueVarByteRawIndexCreatorTest.java | 41 ++++++------
.../segment/index/creator/RawIndexCreatorTest.java | 14 ++--
.../index/forward/ForwardIndexTypeTest.java | 2 -
6 files changed, 81 insertions(+), 67 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
index cf2a8b4de4..024d5762f7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
@@ -199,6 +199,11 @@ public class VarByteChunkForwardIndexReaderV4
return ArraySerDeUtils.deserializeBytesArray(context.getValue(docId));
}
+ @Override
+ public int getNumValuesMV(int docId, ReaderContext context) {
+ return ByteBuffer.wrap(context.getValue(docId)).getInt();
+ }
+
@Override
public void close()
throws IOException {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV5.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV5.java
index e72fedfc58..d49368b006 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV5.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV5.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.segment.index.readers.forward;
+import java.nio.ByteBuffer;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5;
import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
@@ -80,4 +81,15 @@ public class VarByteChunkForwardIndexReaderV5 extends
VarByteChunkForwardIndexRe
public double[] getDoubleMV(int docId,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
return
ArraySerDeUtils.deserializeDoubleArrayWithoutLength(context.getValue(docId));
}
+
+ @Override
+ public int getNumValuesMV(int docId, ReaderContext context) {
+ byte[] bytes = context.getValue(docId);
+ int valueSize = getStoredType().size();
+ if (valueSize > 0) {
+ return bytes.length / valueSize;
+ } else {
+ return ByteBuffer.wrap(bytes).getInt();
+ }
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
index 6828964892..1a876970d8 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -31,22 +31,24 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
-import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
+import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
public class MultiValueFixedByteRawIndexCreatorTest implements
PinotBuffersAfterMethodCheckRule {
@@ -57,8 +59,8 @@ public class MultiValueFixedByteRawIndexCreatorTest
implements PinotBuffersAfter
@DataProvider(name = "compressionTypes")
public Object[][] compressionTypes() {
return Arrays.stream(ChunkCompressionType.values())
- .flatMap(ct -> IntStream.of(2, 4).boxed()
- .map(writerVersion -> new Object[]{ct,
writerVersion})).toArray(Object[][]::new);
+ .flatMap(ct -> IntStream.rangeClosed(2, 5).boxed().map(writerVersion
-> new Object[]{ct, writerVersion}))
+ .toArray(Object[][]::new);
}
@BeforeClass
@@ -155,11 +157,6 @@ public class MultiValueFixedByteRawIndexCreatorTest
implements PinotBuffersAfter
maxElements, false, writerVersion, 1024 * 1024, 1000);
}
- public ForwardIndexReader getForwardIndexReader(PinotDataBuffer buffer,
DataType dataType, int writerVersion) {
- return writerVersion == VarByteChunkForwardIndexWriterV4.VERSION ? new
VarByteChunkForwardIndexReaderV4(buffer,
- dataType.getStoredType(), false) : new
FixedByteChunkMVForwardIndexReader(buffer, dataType.getStoredType());
- }
-
public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T>
sizeof, IntFunction<T> constructor,
Injector<T> injector, Extractor<T> extractor, ChunkCompressionType
compressionType, int writerVersion)
throws IOException {
@@ -167,33 +164,33 @@ public class MultiValueFixedByteRawIndexCreatorTest
implements PinotBuffersAfter
int numDocs = inputs.size();
int maxElements =
inputs.stream().mapToInt(sizeof).max().orElseThrow(RuntimeException::new);
File file = new File(_outputDir, column +
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
- file.delete();
- MultiValueFixedByteRawIndexCreator creator =
- getMultiValueFixedByteRawIndexCreator(compressionType, column,
numDocs, dataType, maxElements, writerVersion);
- inputs.forEach(input -> injector.inject(creator, input));
- creator.close();
-
- //read
- try (final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0,
file.length(), ByteOrder.BIG_ENDIAN,
- "")) {
- ForwardIndexReader reader = getForwardIndexReader(buffer, dataType,
writerVersion);
-
- final ForwardIndexReaderContext context = reader.createContext();
+ FileUtils.deleteQuietly(file);
+ try (MultiValueFixedByteRawIndexCreator creator =
getMultiValueFixedByteRawIndexCreator(compressionType, column,
+ numDocs, dataType, maxElements, writerVersion)) {
+ inputs.forEach(input -> injector.inject(creator, input));
+ }
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0,
file.length(), ByteOrder.BIG_ENDIAN, "");
+ ForwardIndexReader reader =
ForwardIndexReaderFactory.createRawIndexReader(buffer, dataType, false);
+ ForwardIndexReaderContext context = reader.createContext()) {
T valueBuffer = constructor.apply(maxElements);
for (int i = 0; i < numDocs; i++) {
- Assert.assertEquals(inputs.get(i), extractor.extract(reader, context,
i, valueBuffer));
+ T input = inputs.get(i);
+ assertEquals(reader.getNumValuesMV(i, context),
sizeof.applyAsInt(input));
+ assertEquals(extractor.extract(reader, context, i, valueBuffer),
input);
}
// Value byte range test
- Assert.assertTrue(reader.isBufferByteRangeInfoSupported());
- Assert.assertFalse(reader.isFixedOffsetMappingType());
- final ForwardIndexReaderContext valueRangeContext =
reader.createContext();
+ assertTrue(reader.isBufferByteRangeInfoSupported());
+ assertFalse(reader.isFixedOffsetMappingType());
List<ForwardIndexReader.ByteRange> ranges = new ArrayList<>();
- for (int i = 0; i < numDocs; i++) {
- try {
- reader.recordDocIdByteRanges(i, valueRangeContext, ranges);
- } catch (Exception e) {
- Assert.fail("Failed to record byte ranges for docId: " + i, e);
+ try (ForwardIndexReaderContext valueRangeContext =
reader.createContext()) {
+ for (int i = 0; i < numDocs; i++) {
+ try {
+ reader.recordDocIdByteRanges(i, valueRangeContext, ranges);
+ } catch (Exception e) {
+ fail("Failed to record byte ranges for docId: " + i, e);
+ }
}
}
}
@@ -208,14 +205,11 @@ public class MultiValueFixedByteRawIndexCreatorTest
implements PinotBuffersAfter
}
private static List<int[]> ints(boolean isFixedMVRowLength) {
- return IntStream.range(0, 1000)
- .mapToObj(i -> new int[isFixedMVRowLength ? 50 : RANDOM.nextInt(50)])
- .peek(array -> {
- for (int i = 0; i < array.length; i++) {
- array[i] = RANDOM.nextInt();
- }
- })
- .collect(Collectors.toList());
+ return IntStream.range(0, 1000).mapToObj(i -> new int[isFixedMVRowLength ?
50 : RANDOM.nextInt(50)]).peek(array -> {
+ for (int i = 0; i < array.length; i++) {
+ array[i] = RANDOM.nextInt();
+ }
+ }).collect(Collectors.toList());
}
private static List<long[]> longs(boolean isFixedMVRowLength) {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index 8dde0da31f..4f150b0567 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -34,16 +34,18 @@ import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByte
import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+
public class MultiValueVarByteRawIndexCreatorTest implements
PinotBuffersAfterMethodCheckRule {
@@ -58,10 +60,13 @@ public class MultiValueVarByteRawIndexCreatorTest
implements PinotBuffersAfterMe
@DataProvider
public Object[][] params() {
- return
Arrays.stream(ChunkCompressionType.values()).flatMap(chunkCompressionType ->
IntStream.of(2, 4).boxed()
- .flatMap(writerVersion -> IntStream.of(10, 15, 20,
1000).boxed().flatMap(maxLength -> Stream.of(true, false)
- .flatMap(
- useFullSize -> IntStream.range(1, 20).map(i -> i * 2 -
1).boxed().map(maxNumEntries -> new Object[]{
+ return Arrays.stream(ChunkCompressionType.values())
+ .flatMap(chunkCompressionType -> IntStream.rangeClosed(2, 5)
+ .boxed()
+ .flatMap(writerVersion -> IntStream.of(10, 100)
+ .boxed()
+ .flatMap(maxLength -> Stream.of(true, false)
+ .flatMap(useFullSize -> IntStream.of(1, 10,
20).boxed().map(maxNumEntries -> new Object[]{
chunkCompressionType, useFullSize, writerVersion,
maxLength, maxNumEntries
})))))
.toArray(Object[][]::new);
@@ -75,16 +80,18 @@ public class MultiValueVarByteRawIndexCreatorTest
implements PinotBuffersAfterMe
@Test(expectedExceptions = IllegalArgumentException.class)
public void testOverflowElementCount()
throws IOException {
- new MultiValueVarByteRawIndexCreator(OUTPUT_DIR,
ChunkCompressionType.PASS_THROUGH,
- "column", 10000, DataType.STRING, 1, Integer.MAX_VALUE / 2);
+ new MultiValueVarByteRawIndexCreator(OUTPUT_DIR,
ChunkCompressionType.PASS_THROUGH, "column", 10000,
+ DataType.STRING, 1, Integer.MAX_VALUE / 2);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testOverflowMaxLengthInBytes()
throws IOException {
- // contrived to produce a positive chunk size > Integer.MAX_VALUE but not
fail num elements checks
- new MultiValueVarByteRawIndexCreator(OUTPUT_DIR,
ChunkCompressionType.PASS_THROUGH,
- "column", 10000, DataType.STRING, Integer.MAX_VALUE - Integer.BYTES -
2 * Integer.BYTES, 2);
+ // Contrived to produce a positive chunk size > Integer.MAX_VALUE but not
fail num elements checks
+ // This check only applies to v2/v3
+ new MultiValueVarByteRawIndexCreator(OUTPUT_DIR,
ChunkCompressionType.PASS_THROUGH, "column", 10000,
+ DataType.STRING, 2, Integer.MAX_VALUE - Integer.BYTES - 2 *
Integer.BYTES, 2,
+ ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}
@Test(dataProvider = "params")
@@ -126,15 +133,15 @@ public class MultiValueVarByteRawIndexCreatorTest
implements PinotBuffersAfterMe
}
}
- //read
try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0,
file.length(), ByteOrder.BIG_ENDIAN, "");
ForwardIndexReader reader =
ForwardIndexReaderFactory.createRawIndexReader(buffer, DataType.STRING, false);
ForwardIndexReaderContext context = reader.createContext()) {
String[] values = new String[maxElements];
for (int i = 0; i < numDocs; i++) {
+ String[] input = inputs.get(i);
+ assertEquals(reader.getNumValuesMV(i, context), input.length);
int length = reader.getStringMV(i, values, context);
- String[] readValue = Arrays.copyOf(values, length);
- Assert.assertEquals(inputs.get(i), readValue);
+ assertEquals(Arrays.copyOf(values, length), input);
}
}
}
@@ -178,17 +185,15 @@ public class MultiValueVarByteRawIndexCreatorTest
implements PinotBuffersAfterMe
}
}
- //read
try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0,
file.length(), ByteOrder.BIG_ENDIAN, "");
ForwardIndexReader reader =
ForwardIndexReaderFactory.createRawIndexReader(buffer, DataType.BYTES, false);
ForwardIndexReaderContext context = reader.createContext()) {
byte[][] values = new byte[maxElements][];
for (int i = 0; i < numDocs; i++) {
+ byte[][] input = inputs.get(i);
+ assertEquals(reader.getNumValuesMV(i, context), input.length);
int length = reader.getBytesMV(i, values, context);
- byte[][] readValue = Arrays.copyOf(values, length);
- for (int j = 0; j < length; j++) {
- Assert.assertTrue(Arrays.equals(inputs.get(i)[j], readValue[j]));
- }
+ assertEquals(Arrays.copyOf(values, length), input);
}
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
index 69746ae725..848a62d29c 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
@@ -31,7 +31,6 @@ import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationD
import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -96,7 +95,8 @@ public class RawIndexCreatorTest implements
PinotBuffersAfterClassCheckRule {
.build();
//@formatter:on
private static final TableConfig TABLE_CONFIG = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
- .setNoDictionaryColumns(SCHEMA.getDimensionNames()).build();
+ .setNoDictionaryColumns(SCHEMA.getDimensionNames())
+ .build();
private static final Random RANDOM = new Random();
private RecordReader _recordReader;
@@ -207,8 +207,9 @@ public class RawIndexCreatorTest implements
PinotBuffersAfterClassCheckRule {
public void testStringMVRawIndexCreator()
throws Exception {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_MV_COLUMN);
- try (VarByteChunkMVForwardIndexReader rawIndexReader = new
VarByteChunkMVForwardIndexReader(indexBuffer,
- DataType.STRING); ChunkReaderContext readerContext =
rawIndexReader.createContext()) {
+ try (
+ ForwardIndexReader rawIndexReader =
ForwardIndexReaderFactory.createRawIndexReader(indexBuffer, DataType.STRING,
+ false); ForwardIndexReaderContext readerContext =
rawIndexReader.createContext()) {
_recordReader.rewind();
int maxNumberOfMultiValues =
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(STRING_MV_COLUMN).getMaxNumberOfMultiValues();
@@ -239,9 +240,8 @@ public class RawIndexCreatorTest implements
PinotBuffersAfterClassCheckRule {
public void testBytesMVRawIndexCreator()
throws Exception {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN);
- try (VarByteChunkMVForwardIndexReader rawIndexReader = new
VarByteChunkMVForwardIndexReader(indexBuffer,
- DataType.BYTES);
- ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ try (ForwardIndexReader rawIndexReader =
ForwardIndexReaderFactory.createRawIndexReader(indexBuffer, DataType.BYTES,
+ false); ForwardIndexReaderContext readerContext =
rawIndexReader.createContext()) {
_recordReader.rewind();
int maxNumberOfMultiValues =
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(BYTES_MV_COLUMN).getMaxNumberOfMultiValues();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
index 12f53908be..362c88a8a8 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
@@ -142,7 +142,6 @@ public class ForwardIndexTypeTest {
new ForwardIndexConfig.Builder()
.withCompressionType(ChunkCompressionType.SNAPPY)
.withDeriveNumDocsPerChunk(false)
- .withRawIndexWriterVersion(2)
.build()
);
}
@@ -163,7 +162,6 @@ public class ForwardIndexTypeTest {
new ForwardIndexConfig.Builder()
.withCompressionType(ChunkCompressionType.SNAPPY)
.withDeriveNumDocsPerChunk(false)
- .withRawIndexWriterVersion(2)
.build()
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]