This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0e1c12ff46 Marks end criteria reached for the segment if the Index
cannot consume more rows (#14479)
0e1c12ff46 is described below
commit 0e1c12ff4644af66cd159b209710ee06f5e82a5c
Author: NOOB <[email protected]>
AuthorDate: Fri Dec 6 01:41:30 2024 +0530
Marks end criteria reached for the segment if the Index cannot consume more
rows (#14479)
---
.../protocols/SegmentCompletionProtocol.java | 3 +
.../realtime/RealtimeSegmentDataManager.java | 12 ++
.../realtime/RealtimeSegmentDataManagerTest.java | 23 +++
.../indexsegment/mutable/MutableSegmentImpl.java | 20 ++-
.../forward/FixedByteMVMutableForwardIndex.java | 10 ++
.../MutableSegmentEntriesAboveThresholdTest.java | 194 +++++++++++++++++++++
.../FixedByteMVMutableForwardIndexTest.java | 41 +++++
.../FixedByteSVMutableForwardIndexTest.java | 1 +
.../segment/spi/index/mutable/MutableIndex.java | 7 +
9 files changed, 310 insertions(+), 1 deletion(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 86d880a935..f5aed34154 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -148,6 +148,9 @@ public class SegmentCompletionProtocol {
public static final String REASON_END_OF_PARTITION_GROUP =
"endOfPartitionGroup";
// Stop reason sent by server as force commit message received
public static final String REASON_FORCE_COMMIT_MESSAGE_RECEIVED =
"forceCommitMessageReceived";
+ // Stop reason sent by server as mutable index cannot consume more rows
+ // (like size reaching close to its limit or number of col values for a col
is about to overflow int max)
+ public static final String REASON_INDEX_CAPACITY_THRESHOLD_BREACHED =
"indexCapacityThresholdBreached";
// Canned responses
public static final Response RESP_NOT_LEADER =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 35e8aa3c46..06f4649533 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -363,6 +363,13 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_numRowsConsumed, _numRowsIndexed);
_stopReason =
SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED;
return true;
+ } else if (!canAddMore()) {
+ _segmentLogger.info(
+ "Stopping consumption as mutable index cannot consume more rows
- numRowsConsumed={} "
+ + "numRowsIndexed={}",
+ _numRowsConsumed, _numRowsIndexed);
+ _stopReason =
SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED;
+ return true;
}
return false;
@@ -697,6 +704,11 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return prematureExit;
}
+ @VisibleForTesting
+ boolean canAddMore() {
+ return _realtimeSegment.canAddMore();
+ }
+
public class PartitionConsumer implements Runnable {
public void run() {
long initialConsumptionEnd = 0L;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index f7ca4530bd..60a2f19233 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -637,6 +637,19 @@ public class RealtimeSegmentDataManagerTest {
segmentDataManager._timeSupplier.set(endTime);
Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
}
+
+ // test end criteria reached if any of the index cannot take more rows
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(false, new TimeSupplier(), null,
+ null, null)) {
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+ Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
+
+ segmentDataManager.setIndexCapacityThresholdBreached(true);
+
+ Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
+ Assert.assertEquals(segmentDataManager.getStopReason(),
+ SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED);
+ }
}
private void setHasMessagesFetched(FakeRealtimeSegmentDataManager
segmentDataManager, boolean hasMessagesFetched)
@@ -907,6 +920,7 @@ public class RealtimeSegmentDataManagerTest {
public Map<Integer, Semaphore> _semaphoreMap;
public boolean _stubConsumeLoop = true;
private TimeSupplier _timeSupplier;
+ private boolean _indexCapacityThresholdBreached;
private static InstanceDataManagerConfig makeInstanceDataManagerConfig() {
InstanceDataManagerConfig dataManagerConfig =
mock(InstanceDataManagerConfig.class);
@@ -1087,6 +1101,15 @@ public class RealtimeSegmentDataManagerTest {
setOffset(offset, "_finalOffset");
}
+ @Override
+ protected boolean canAddMore() {
+ return !_indexCapacityThresholdBreached;
+ }
+
+ public void setIndexCapacityThresholdBreached(boolean
indexCapacityThresholdBreached) {
+ _indexCapacityThresholdBreached = indexCapacityThresholdBreached;
+ }
+
public boolean invokeEndCriteriaReached() {
Method endCriteriaReached = null;
try {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index d7008665fd..1812334f67 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -150,6 +150,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final File _consumerDir;
private final Map<String, IndexContainer> _indexContainerMap = new
HashMap<>();
+ private boolean _indexCapacityThresholdBreached;
private final IdMap<FixedIntArray> _recordIdMap;
@@ -828,7 +829,20 @@ public class MutableSegmentImpl implements MutableSegment {
Object[] values = (Object[]) value;
for (Map.Entry<IndexType, MutableIndex> indexEntry :
indexContainer._mutableIndexes.entrySet()) {
try {
- indexEntry.getValue().add(values, dictIds, docId);
+ MutableIndex mutableIndex = indexEntry.getValue();
+ mutableIndex.add(values, dictIds, docId);
+ // Few of the Immutable version of the mutable index are bounded
by size like FixedBitMVForwardIndex.
+ // If num of values overflows or size is above limit, A mutable
index is unable to convert to
+ // an immutable index and segment build fails causing the realtime
consumption to stop.
+ // Hence, The below check is a temporary measure to avoid such
scenarios until immutable index
+ // implementations are changed.
+ if (!_indexCapacityThresholdBreached &&
!mutableIndex.canAddMore()) {
+ _logger.info(
+ "Index: {} for column: {} cannot consume more rows, marking
_indexCapacityThresholdBreached as true",
+ indexEntry.getKey(), column
+ );
+ _indexCapacityThresholdBreached = true;
+ }
} catch (Exception e) {
recordIndexingError(indexEntry.getKey(), e);
}
@@ -1265,6 +1279,10 @@ public class MutableSegmentImpl implements
MutableSegment {
return _recordIdMap != null;
}
+ public boolean canAddMore() {
+ return !_indexCapacityThresholdBreached;
+ }
+
// NOTE: Okay for single-writer
@SuppressWarnings("NonAtomicOperationOnVolatileField")
private static class ValuesInfo {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
index e30f15f81d..b1826e08a8 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
@@ -104,6 +104,9 @@ public class FixedByteMVMutableForwardIndex implements
MutableForwardIndex {
private static final int INCREMENT_PERCENTAGE = 100;
//Increments the Initial size by 100% of initial capacity every time we runs
out of capacity
+ // Conservative figure to not breach 2GB size limit for immutable index
+ private final static int DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN =
450_000_000;
+
// For single writer multiple readers setup, use ArrayList for writer and
CopyOnWriteArrayList for reader
private final List<FixedByteSingleValueMultiColWriter> _headerWriters = new
ArrayList<>();
private final List<FixedByteSingleValueMultiColReader> _headerReaders = new
CopyOnWriteArrayList<>();
@@ -124,6 +127,7 @@ public class FixedByteMVMutableForwardIndex implements
MutableForwardIndex {
private int _currentCapacity = 0;
private int _prevRowStartIndex = 0; // Offset in the data-buffer for the
last row added.
private int _prevRowLength = 0; // Number of values in the column for the
last row added.
+ private int _numValues = 0;
public FixedByteMVMutableForwardIndex(int maxNumberOfMultiValuesPerRow, int
avgMultiValueCount, int rowCountPerChunk,
int columnSizeInBytes, PinotDataBufferMemoryManager memoryManager,
String context, boolean isDictionaryEncoded,
@@ -200,6 +204,7 @@ public class FixedByteMVMutableForwardIndex implements
MutableForwardIndex {
private int updateHeader(int row, int numValues) {
assert (numValues <= _maxNumberOfMultiValuesPerRow);
+ _numValues += numValues;
int newStartIndex = _prevRowStartIndex + _prevRowLength;
if (newStartIndex + numValues > _currentCapacity) {
addDataBuffer(_incrementalCapacity);
@@ -414,6 +419,11 @@ public class FixedByteMVMutableForwardIndex implements
MutableForwardIndex {
}
}
+ @Override
+ public boolean canAddMore() {
+ return _numValues < DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN;
+ }
+
@Override
public void close()
throws IOException {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java
new file mode 100644
index 0000000000..1eaaab657d
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.mutable;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
+import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class MutableSegmentEntriesAboveThresholdTest {
+ private static final File TEMP_DIR =
+ new File(FileUtils.getTempDirectory(),
MutableSegmentEntriesAboveThresholdTest.class.getSimpleName());
+ private static final String AVRO_FILE = "data/test_data-mv.avro";
+ private Schema _schema;
+
+ private static class FakeMutableForwardIndex implements MutableForwardIndex {
+
+ private final MutableForwardIndex _mutableForwardIndex;
+ private static final int THRESHOLD = 2;
+ private int _numValues;
+
+ FakeMutableForwardIndex(MutableForwardIndex mutableForwardIndex) {
+ _mutableForwardIndex = mutableForwardIndex;
+ _numValues = 0;
+ }
+
+ @Override
+ public boolean canAddMore() {
+ return _numValues < THRESHOLD;
+ }
+
+ @Override
+ public void setDictIdMV(int docId, int[] dictIds) {
+ _numValues += dictIds.length;
+ _mutableForwardIndex.setDictIdMV(docId, dictIds);
+ }
+
+ @Override
+ public int getLengthOfShortestElement() {
+ return _mutableForwardIndex.getLengthOfShortestElement();
+ }
+
+ @Override
+ public int getLengthOfLongestElement() {
+ return _mutableForwardIndex.getLengthOfLongestElement();
+ }
+
+ @Override
+ public void setDictId(int docId, int dictId) {
+ _mutableForwardIndex.setDictId(docId, dictId);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return _mutableForwardIndex.isDictionaryEncoded();
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return _mutableForwardIndex.isSingleValue();
+ }
+
+ @Override
+ public FieldSpec.DataType getStoredType() {
+ return _mutableForwardIndex.getStoredType();
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _mutableForwardIndex.close();
+ }
+ }
+
+ private File getAvroFile() {
+ URL resourceUrl =
MutableSegmentImplTest.class.getClassLoader().getResource(AVRO_FILE);
+ Assert.assertNotNull(resourceUrl);
+ return new File(resourceUrl.getFile());
+ }
+
+ private MutableSegmentImpl getMutableSegment(File avroFile)
+ throws Exception {
+ FileUtils.deleteQuietly(TEMP_DIR);
+
+ SegmentGeneratorConfig config =
+ SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile,
TEMP_DIR, "testTable");
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config);
+ driver.build();
+
+ _schema = config.getSchema();
+
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(_schema,
"testSegment");
+ return MutableSegmentImplTestUtils
+ .createMutableSegmentImpl(_schema, Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(),
+ Collections.emptyMap(),
+ false, false, null, null, null, null, null, null,
Collections.emptyList());
+ }
+
+ @Test
+ public void testNoLimitBreached()
+ throws Exception {
+ File avroFile = getAvroFile();
+ MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);
+ StreamMessageMetadata defaultMetadata = new
StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
+ try (RecordReader recordReader = RecordReaderFactory
+ .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(),
null)) {
+ GenericRow reuse = new GenericRow();
+ while (recordReader.hasNext()) {
+ mutableSegment.index(recordReader.next(reuse), defaultMetadata);
+ }
+ }
+ assert mutableSegment.canAddMore();
+ }
+
+ @Test
+ public void testLimitBreached()
+ throws Exception {
+ File avroFile = getAvroFile();
+ MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);
+
+ Field indexContainerMapField =
MutableSegmentImpl.class.getDeclaredField("_indexContainerMap");
+ indexContainerMapField.setAccessible(true);
+ Map<String, Object> colVsIndexContainer = (Map<String, Object>)
indexContainerMapField.get(mutableSegment);
+
+ for (Map.Entry<String, Object> entry : colVsIndexContainer.entrySet()) {
+ Object indexContainer = entry.getValue();
+ Field mutableIndexesField =
indexContainer.getClass().getDeclaredField("_mutableIndexes");
+ mutableIndexesField.setAccessible(true);
+ Map<IndexType, MutableIndex> indexTypeVsMutableIndex =
+ (Map<IndexType, MutableIndex>)
mutableIndexesField.get(indexContainer);
+
+ MutableForwardIndex mutableForwardIndex = null;
+ for (IndexType indexType : indexTypeVsMutableIndex.keySet()) {
+ if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) {
+ mutableForwardIndex = (MutableForwardIndex)
indexTypeVsMutableIndex.get(indexType);
+ }
+ }
+
+ assert mutableForwardIndex != null;
+
+ indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(),
+ new FakeMutableForwardIndex(mutableForwardIndex));
+ }
+ StreamMessageMetadata defaultMetadata = new
StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
+ try (RecordReader recordReader = RecordReaderFactory
+ .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(),
null)) {
+ GenericRow reuse = new GenericRow();
+ while (recordReader.hasNext()) {
+ mutableSegment.index(recordReader.next(reuse), defaultMetadata);
+ }
+ }
+
+ assert !mutableSegment.canAddMore();
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
index 3268277562..77fd706f3c 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.segment.index.forward.mutable;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Random;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
@@ -78,6 +79,7 @@ public class FixedByteMVMutableForwardIndexTest {
readerWriter =
new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 2,
rows / 2, columnSizeInBytes, _memoryManager,
"IntArray", isDictionaryEncoded, FieldSpec.DataType.INT);
+ int valuesAdded = 0;
Random r = new Random(seed);
int[][] data = new int[rows][];
@@ -87,6 +89,7 @@ public class FixedByteMVMutableForwardIndexTest {
data[i][j] = r.nextInt();
}
readerWriter.setIntMV(i, data[i]);
+ valuesAdded += data[i].length;
}
int[] ret = new int[maxNumberOfMultiValuesPerRow];
for (int i = 0; i < rows; i++) {
@@ -94,6 +97,7 @@ public class FixedByteMVMutableForwardIndexTest {
Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)),
"Failed with seed=" + seed);
}
+ validateNumOfValues(readerWriter, valuesAdded);
readerWriter.close();
}
@@ -106,6 +110,7 @@ public class FixedByteMVMutableForwardIndexTest {
// transition to new ones
readerWriter = new FixedByteMVMutableForwardIndex(multiValuesPerRow,
multiValuesPerRow, multiValuesPerRow * 2,
columnSizeInBytes, _memoryManager, "IntArrayFixedSize",
isDictionaryEncoded, FieldSpec.DataType.INT);
+ int valuesAdded = 0;
Random r = new Random(seed);
int[][] data = new int[rows][];
@@ -115,6 +120,7 @@ public class FixedByteMVMutableForwardIndexTest {
data[i][j] = r.nextInt();
}
readerWriter.setIntMV(i, data[i]);
+ valuesAdded += data[i].length;
}
int[] ret = new int[multiValuesPerRow];
for (int i = 0; i < rows; i++) {
@@ -122,6 +128,7 @@ public class FixedByteMVMutableForwardIndexTest {
Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)),
"Failed with seed=" + seed);
}
+ validateNumOfValues(readerWriter, valuesAdded);
readerWriter.close();
}
@@ -135,6 +142,7 @@ public class FixedByteMVMutableForwardIndexTest {
readerWriter =
new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 3,
r.nextInt(rows) + 1, columnSizeInBytes,
_memoryManager, "ZeroSize", isDictionaryEncoded,
FieldSpec.DataType.INT);
+ int valuesAdded = 0;
int[][] data = new int[rows][];
for (int i = 0; i < rows; i++) {
@@ -144,9 +152,11 @@ public class FixedByteMVMutableForwardIndexTest {
data[i][j] = r.nextInt();
}
readerWriter.setIntMV(i, data[i]);
+ valuesAdded += data[i].length;
} else {
data[i] = new int[0];
readerWriter.setIntMV(i, data[i]);
+ valuesAdded += data[i].length;
}
}
int[] ret = new int[maxNumberOfMultiValuesPerRow];
@@ -155,6 +165,7 @@ public class FixedByteMVMutableForwardIndexTest {
Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)),
"Failed with seed=" + seed);
}
+ validateNumOfValues(readerWriter, valuesAdded);
readerWriter.close();
}
@@ -187,6 +198,7 @@ public class FixedByteMVMutableForwardIndexTest {
final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
FixedByteMVMutableForwardIndex readerWriter =
createReaderWriter(FieldSpec.DataType.LONG, r, rows,
maxNumberOfMultiValuesPerRow, isDictionaryEncoded);
+ int valuesAdded = 0;
long[][] data = new long[rows][];
for (int i = 0; i < rows; i++) {
@@ -196,9 +208,11 @@ public class FixedByteMVMutableForwardIndexTest {
data[i][j] = r.nextLong();
}
readerWriter.setLongMV(i, data[i]);
+ valuesAdded += data[i].length;
} else {
data[i] = new long[0];
readerWriter.setLongMV(i, data[i]);
+ valuesAdded += data[i].length;
}
}
long[] ret = new long[maxNumberOfMultiValuesPerRow];
@@ -207,6 +221,7 @@ public class FixedByteMVMutableForwardIndexTest {
Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)),
"Failed with seed=" + seed);
}
+ validateNumOfValues(readerWriter, valuesAdded);
readerWriter.close();
}
@@ -225,6 +240,7 @@ public class FixedByteMVMutableForwardIndexTest {
final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
FixedByteMVMutableForwardIndex readerWriter =
createReaderWriter(FieldSpec.DataType.FLOAT, r, rows,
maxNumberOfMultiValuesPerRow, isDictoinaryEncoded);
+ int valuesAdded = 0;
float[][] data = new float[rows][];
for (int i = 0; i < rows; i++) {
@@ -234,9 +250,11 @@ public class FixedByteMVMutableForwardIndexTest {
data[i][j] = r.nextFloat();
}
readerWriter.setFloatMV(i, data[i]);
+ valuesAdded += data[i].length;
} else {
data[i] = new float[0];
readerWriter.setFloatMV(i, data[i]);
+ valuesAdded += data[i].length;
}
}
float[] ret = new float[maxNumberOfMultiValuesPerRow];
@@ -245,6 +263,7 @@ public class FixedByteMVMutableForwardIndexTest {
Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)),
"Failed with seed=" + seed);
}
+ validateNumOfValues(readerWriter, valuesAdded);
readerWriter.close();
}
@@ -263,6 +282,7 @@ public class FixedByteMVMutableForwardIndexTest {
final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
FixedByteMVMutableForwardIndex readerWriter =
createReaderWriter(FieldSpec.DataType.DOUBLE, r, rows,
maxNumberOfMultiValuesPerRow, isDictonaryEncoded);
+ int valuesAdded = 0;
double[][] data = new double[rows][];
for (int i = 0; i < rows; i++) {
@@ -272,9 +292,11 @@ public class FixedByteMVMutableForwardIndexTest {
data[i][j] = r.nextDouble();
}
readerWriter.setDoubleMV(i, data[i]);
+ valuesAdded += data[i].length;
} else {
data[i] = new double[0];
readerWriter.setDoubleMV(i, data[i]);
+ valuesAdded += data[i].length;
}
}
double[] ret = new double[maxNumberOfMultiValuesPerRow];
@@ -283,6 +305,25 @@ public class FixedByteMVMutableForwardIndexTest {
Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)),
"Failed with seed=" + seed);
}
+ validateNumOfValues(readerWriter, valuesAdded);
readerWriter.close();
}
+
+ private int getNumValues(FixedByteMVMutableForwardIndex readerWriter)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field numValuesField =
FixedByteMVMutableForwardIndex.class.getDeclaredField("_numValues");
+ numValuesField.setAccessible(true);
+ return (int) numValuesField.get(readerWriter);
+ }
+
+ private void validateNumOfValues(FixedByteMVMutableForwardIndex
readerWriter, int valuesAdded) {
+ int numValuesPresentInIndex;
+ try {
+ numValuesPresentInIndex = getNumValues(readerWriter);
+ } catch (Exception e) {
+ throw new AssertionError("failed to validate the num of values added in
the index");
+ }
+ Assert.assertEquals(numValuesPresentInIndex, valuesAdded);
+ Assert.assertTrue(readerWriter.canAddMore());
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java
index 1e0d827793..cadea9a1fc 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java
@@ -112,6 +112,7 @@ public class FixedByteSVMutableForwardIndexTest {
for (int i = 0; i < 2 * rows; i++) {
Assert.assertEquals(readerWriter.getDictId(start + i), 0);
}
+ Assert.assertTrue(readerWriter.canAddMore());
readerWriter.close();
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
index dc3bdc9869..494361947b 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
@@ -68,4 +68,11 @@ public interface MutableIndex extends IndexReader {
*/
default void commit() {
}
+
+ /**
+ * Returns a boolean denoting whether the mutable index can consume any more
rows or not.
+ */
+ default boolean canAddMore() {
+ return true;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]