This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e1c12ff46 Marks end criteria reached for the segment if the Index 
cannot consume more rows (#14479)
0e1c12ff46 is described below

commit 0e1c12ff4644af66cd159b209710ee06f5e82a5c
Author: NOOB <[email protected]>
AuthorDate: Fri Dec 6 01:41:30 2024 +0530

    Marks end criteria reached for the segment if the Index cannot consume more 
rows (#14479)
---
 .../protocols/SegmentCompletionProtocol.java       |   3 +
 .../realtime/RealtimeSegmentDataManager.java       |  12 ++
 .../realtime/RealtimeSegmentDataManagerTest.java   |  23 +++
 .../indexsegment/mutable/MutableSegmentImpl.java   |  20 ++-
 .../forward/FixedByteMVMutableForwardIndex.java    |  10 ++
 .../MutableSegmentEntriesAboveThresholdTest.java   | 194 +++++++++++++++++++++
 .../FixedByteMVMutableForwardIndexTest.java        |  41 +++++
 .../FixedByteSVMutableForwardIndexTest.java        |   1 +
 .../segment/spi/index/mutable/MutableIndex.java    |   7 +
 9 files changed, 310 insertions(+), 1 deletion(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 86d880a935..f5aed34154 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -148,6 +148,9 @@ public class SegmentCompletionProtocol {
   public static final String REASON_END_OF_PARTITION_GROUP = 
"endOfPartitionGroup";
   // Stop reason sent by server as force commit message received
   public static final String REASON_FORCE_COMMIT_MESSAGE_RECEIVED = 
"forceCommitMessageReceived";
+  // Stop reason sent by server as mutable index cannot consume more rows
+  // (like size reaching close to its limit or number of col values for a col 
is about to overflow int max)
+  public static final String REASON_INDEX_CAPACITY_THRESHOLD_BREACHED = 
"indexCapacityThresholdBreached";
 
   // Canned responses
   public static final Response RESP_NOT_LEADER =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 35e8aa3c46..06f4649533 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -363,6 +363,13 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               _numRowsConsumed, _numRowsIndexed);
           _stopReason = 
SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED;
           return true;
+        } else if (!canAddMore()) {
+          _segmentLogger.info(
+              "Stopping consumption as mutable index cannot consume more rows 
- numRowsConsumed={} "
+                  + "numRowsIndexed={}",
+              _numRowsConsumed, _numRowsIndexed);
+          _stopReason = 
SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED;
+          return true;
         }
         return false;
 
@@ -697,6 +704,11 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     return prematureExit;
   }
 
+  @VisibleForTesting
+  boolean canAddMore() {
+    return _realtimeSegment.canAddMore();
+  }
+
   public class PartitionConsumer implements Runnable {
     public void run() {
       long initialConsumptionEnd = 0L;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index f7ca4530bd..60a2f19233 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -637,6 +637,19 @@ public class RealtimeSegmentDataManagerTest {
       segmentDataManager._timeSupplier.set(endTime);
       Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
     }
+
+    // test end criteria reached if any of the index cannot take more rows
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager(false, new TimeSupplier(), null,
+        null, null)) {
+      segmentDataManager._state.set(segmentDataManager, 
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+      Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
+
+      segmentDataManager.setIndexCapacityThresholdBreached(true);
+
+      Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
+      Assert.assertEquals(segmentDataManager.getStopReason(),
+          SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED);
+    }
   }
 
   private void setHasMessagesFetched(FakeRealtimeSegmentDataManager 
segmentDataManager, boolean hasMessagesFetched)
@@ -907,6 +920,7 @@ public class RealtimeSegmentDataManagerTest {
     public Map<Integer, Semaphore> _semaphoreMap;
     public boolean _stubConsumeLoop = true;
     private TimeSupplier _timeSupplier;
+    private boolean _indexCapacityThresholdBreached;
 
     private static InstanceDataManagerConfig makeInstanceDataManagerConfig() {
       InstanceDataManagerConfig dataManagerConfig = 
mock(InstanceDataManagerConfig.class);
@@ -1087,6 +1101,15 @@ public class RealtimeSegmentDataManagerTest {
       setOffset(offset, "_finalOffset");
     }
 
+    @Override
+    protected boolean canAddMore() {
+      return !_indexCapacityThresholdBreached;
+    }
+
+    public void setIndexCapacityThresholdBreached(boolean 
indexCapacityThresholdBreached) {
+      _indexCapacityThresholdBreached = indexCapacityThresholdBreached;
+    }
+
     public boolean invokeEndCriteriaReached() {
       Method endCriteriaReached = null;
       try {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index d7008665fd..1812334f67 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -150,6 +150,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private final File _consumerDir;
 
   private final Map<String, IndexContainer> _indexContainerMap = new 
HashMap<>();
+  private boolean _indexCapacityThresholdBreached;
 
   private final IdMap<FixedIntArray> _recordIdMap;
 
@@ -828,7 +829,20 @@ public class MutableSegmentImpl implements MutableSegment {
         Object[] values = (Object[]) value;
         for (Map.Entry<IndexType, MutableIndex> indexEntry : 
indexContainer._mutableIndexes.entrySet()) {
           try {
-            indexEntry.getValue().add(values, dictIds, docId);
+            MutableIndex mutableIndex = indexEntry.getValue();
+            mutableIndex.add(values, dictIds, docId);
+            // Few of the Immutable version of the mutable index are bounded 
by size like FixedBitMVForwardIndex.
+            // If num of values overflows or size is above limit, A mutable 
index is unable to convert to
+            // an immutable index and segment build fails causing the realtime 
consumption to stop.
+            // Hence, The below check is a temporary measure to avoid such 
scenarios until immutable index
+            // implementations are changed.
+            if (!_indexCapacityThresholdBreached && 
!mutableIndex.canAddMore()) {
+              _logger.info(
+                  "Index: {} for column: {} cannot consume more rows, marking 
_indexCapacityThresholdBreached as true",
+                  indexEntry.getKey(), column
+              );
+              _indexCapacityThresholdBreached = true;
+            }
           } catch (Exception e) {
             recordIndexingError(indexEntry.getKey(), e);
           }
@@ -1265,6 +1279,10 @@ public class MutableSegmentImpl implements 
MutableSegment {
     return _recordIdMap != null;
   }
 
+  public boolean canAddMore() {
+    return !_indexCapacityThresholdBreached;
+  }
+
   // NOTE: Okay for single-writer
   @SuppressWarnings("NonAtomicOperationOnVolatileField")
   private static class ValuesInfo {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
index e30f15f81d..b1826e08a8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
@@ -104,6 +104,9 @@ public class FixedByteMVMutableForwardIndex implements 
MutableForwardIndex {
   private static final int INCREMENT_PERCENTAGE = 100;
   //Increments the Initial size by 100% of initial capacity every time we runs 
out of capacity
 
+  // Conservative figure to not breach 2GB size limit for immutable index
+  private final static int DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN = 
450_000_000;
+
   // For single writer multiple readers setup, use ArrayList for writer and 
CopyOnWriteArrayList for reader
   private final List<FixedByteSingleValueMultiColWriter> _headerWriters = new 
ArrayList<>();
   private final List<FixedByteSingleValueMultiColReader> _headerReaders = new 
CopyOnWriteArrayList<>();
@@ -124,6 +127,7 @@ public class FixedByteMVMutableForwardIndex implements 
MutableForwardIndex {
   private int _currentCapacity = 0;
   private int _prevRowStartIndex = 0;  // Offset in the data-buffer for the 
last row added.
   private int _prevRowLength = 0;  // Number of values in the column for the 
last row added.
+  private int _numValues = 0;
 
   public FixedByteMVMutableForwardIndex(int maxNumberOfMultiValuesPerRow, int 
avgMultiValueCount, int rowCountPerChunk,
       int columnSizeInBytes, PinotDataBufferMemoryManager memoryManager, 
String context, boolean isDictionaryEncoded,
@@ -200,6 +204,7 @@ public class FixedByteMVMutableForwardIndex implements 
MutableForwardIndex {
 
   private int updateHeader(int row, int numValues) {
     assert (numValues <= _maxNumberOfMultiValuesPerRow);
+    _numValues += numValues;
     int newStartIndex = _prevRowStartIndex + _prevRowLength;
     if (newStartIndex + numValues > _currentCapacity) {
       addDataBuffer(_incrementalCapacity);
@@ -414,6 +419,11 @@ public class FixedByteMVMutableForwardIndex implements 
MutableForwardIndex {
     }
   }
 
+  @Override
+  public boolean canAddMore() {
+    return _numValues < DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN;
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java
new file mode 100644
index 0000000000..1eaaab657d
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentEntriesAboveThresholdTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.mutable;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
+import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class MutableSegmentEntriesAboveThresholdTest {
+  private static final File TEMP_DIR =
+      new File(FileUtils.getTempDirectory(), 
MutableSegmentEntriesAboveThresholdTest.class.getSimpleName());
+  private static final String AVRO_FILE = "data/test_data-mv.avro";
+  private Schema _schema;
+
+  private static class FakeMutableForwardIndex implements MutableForwardIndex {
+
+    private final MutableForwardIndex _mutableForwardIndex;
+    private static final int THRESHOLD = 2;
+    private int _numValues;
+
+    FakeMutableForwardIndex(MutableForwardIndex mutableForwardIndex) {
+      _mutableForwardIndex = mutableForwardIndex;
+      _numValues = 0;
+    }
+
+    @Override
+    public boolean canAddMore() {
+      return _numValues < THRESHOLD;
+    }
+
+    @Override
+    public void setDictIdMV(int docId, int[] dictIds) {
+      _numValues += dictIds.length;
+      _mutableForwardIndex.setDictIdMV(docId, dictIds);
+    }
+
+    @Override
+    public int getLengthOfShortestElement() {
+      return _mutableForwardIndex.getLengthOfShortestElement();
+    }
+
+    @Override
+    public int getLengthOfLongestElement() {
+      return _mutableForwardIndex.getLengthOfLongestElement();
+    }
+
+    @Override
+    public void setDictId(int docId, int dictId) {
+      _mutableForwardIndex.setDictId(docId, dictId);
+    }
+
+    @Override
+    public boolean isDictionaryEncoded() {
+      return _mutableForwardIndex.isDictionaryEncoded();
+    }
+
+    @Override
+    public boolean isSingleValue() {
+      return _mutableForwardIndex.isSingleValue();
+    }
+
+    @Override
+    public FieldSpec.DataType getStoredType() {
+      return _mutableForwardIndex.getStoredType();
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      _mutableForwardIndex.close();
+    }
+  }
+
+  private File getAvroFile() {
+    URL resourceUrl = 
MutableSegmentImplTest.class.getClassLoader().getResource(AVRO_FILE);
+    Assert.assertNotNull(resourceUrl);
+    return new File(resourceUrl.getFile());
+  }
+
+  private MutableSegmentImpl getMutableSegment(File avroFile)
+      throws Exception {
+    FileUtils.deleteQuietly(TEMP_DIR);
+
+    SegmentGeneratorConfig config =
+        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, 
TEMP_DIR, "testTable");
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(config);
+    driver.build();
+
+    _schema = config.getSchema();
+    
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(_schema, 
"testSegment");
+    return MutableSegmentImplTestUtils
+        .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
+            Collections.emptyMap(),
+            false, false, null, null, null, null, null, null, 
Collections.emptyList());
+  }
+
+  @Test
+  public void testNoLimitBreached()
+      throws Exception {
+    File avroFile = getAvroFile();
+    MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
+    try (RecordReader recordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), 
null)) {
+      GenericRow reuse = new GenericRow();
+      while (recordReader.hasNext()) {
+        mutableSegment.index(recordReader.next(reuse), defaultMetadata);
+      }
+    }
+    assert mutableSegment.canAddMore();
+  }
+
+  @Test
+  public void testLimitBreached()
+      throws Exception {
+    File avroFile = getAvroFile();
+    MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);
+
+    Field indexContainerMapField = 
MutableSegmentImpl.class.getDeclaredField("_indexContainerMap");
+    indexContainerMapField.setAccessible(true);
+    Map<String, Object> colVsIndexContainer = (Map<String, Object>) 
indexContainerMapField.get(mutableSegment);
+
+    for (Map.Entry<String, Object> entry : colVsIndexContainer.entrySet()) {
+      Object indexContainer = entry.getValue();
+      Field mutableIndexesField = 
indexContainer.getClass().getDeclaredField("_mutableIndexes");
+      mutableIndexesField.setAccessible(true);
+      Map<IndexType, MutableIndex> indexTypeVsMutableIndex =
+          (Map<IndexType, MutableIndex>) 
mutableIndexesField.get(indexContainer);
+
+      MutableForwardIndex mutableForwardIndex = null;
+      for (IndexType indexType : indexTypeVsMutableIndex.keySet()) {
+        if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) {
+          mutableForwardIndex = (MutableForwardIndex) 
indexTypeVsMutableIndex.get(indexType);
+        }
+      }
+
+      assert mutableForwardIndex != null;
+
+      indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(),
+          new FakeMutableForwardIndex(mutableForwardIndex));
+    }
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
+    try (RecordReader recordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), 
null)) {
+      GenericRow reuse = new GenericRow();
+      while (recordReader.hasNext()) {
+        mutableSegment.index(recordReader.next(reuse), defaultMetadata);
+      }
+    }
+
+    assert !mutableSegment.canAddMore();
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
index 3268277562..77fd706f3c 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.segment.index.forward.mutable;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Random;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
@@ -78,6 +79,7 @@ public class FixedByteMVMutableForwardIndexTest {
     readerWriter =
         new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 2, 
rows / 2, columnSizeInBytes, _memoryManager,
             "IntArray", isDictionaryEncoded, FieldSpec.DataType.INT);
+    int valuesAdded = 0;
 
     Random r = new Random(seed);
     int[][] data = new int[rows][];
@@ -87,6 +89,7 @@ public class FixedByteMVMutableForwardIndexTest {
         data[i][j] = r.nextInt();
       }
       readerWriter.setIntMV(i, data[i]);
+      valuesAdded += data[i].length;
     }
     int[] ret = new int[maxNumberOfMultiValuesPerRow];
     for (int i = 0; i < rows; i++) {
@@ -94,6 +97,7 @@ public class FixedByteMVMutableForwardIndexTest {
       Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
       Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), 
"Failed with seed=" + seed);
     }
+    validateNumOfValues(readerWriter, valuesAdded);
     readerWriter.close();
   }
 
@@ -106,6 +110,7 @@ public class FixedByteMVMutableForwardIndexTest {
     // transition to new ones
     readerWriter = new FixedByteMVMutableForwardIndex(multiValuesPerRow, 
multiValuesPerRow, multiValuesPerRow * 2,
         columnSizeInBytes, _memoryManager, "IntArrayFixedSize", 
isDictionaryEncoded, FieldSpec.DataType.INT);
+    int valuesAdded = 0;
 
     Random r = new Random(seed);
     int[][] data = new int[rows][];
@@ -115,6 +120,7 @@ public class FixedByteMVMutableForwardIndexTest {
         data[i][j] = r.nextInt();
       }
       readerWriter.setIntMV(i, data[i]);
+      valuesAdded += data[i].length;
     }
     int[] ret = new int[multiValuesPerRow];
     for (int i = 0; i < rows; i++) {
@@ -122,6 +128,7 @@ public class FixedByteMVMutableForwardIndexTest {
       Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
       Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), 
"Failed with seed=" + seed);
     }
+    validateNumOfValues(readerWriter, valuesAdded);
     readerWriter.close();
   }
 
@@ -135,6 +142,7 @@ public class FixedByteMVMutableForwardIndexTest {
     readerWriter =
         new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 3, 
r.nextInt(rows) + 1, columnSizeInBytes,
             _memoryManager, "ZeroSize", isDictionaryEncoded, 
FieldSpec.DataType.INT);
+    int valuesAdded = 0;
 
     int[][] data = new int[rows][];
     for (int i = 0; i < rows; i++) {
@@ -144,9 +152,11 @@ public class FixedByteMVMutableForwardIndexTest {
           data[i][j] = r.nextInt();
         }
         readerWriter.setIntMV(i, data[i]);
+        valuesAdded += data[i].length;
       } else {
         data[i] = new int[0];
         readerWriter.setIntMV(i, data[i]);
+        valuesAdded += data[i].length;
       }
     }
     int[] ret = new int[maxNumberOfMultiValuesPerRow];
@@ -155,6 +165,7 @@ public class FixedByteMVMutableForwardIndexTest {
       Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
       Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), 
"Failed with seed=" + seed);
     }
+    validateNumOfValues(readerWriter, valuesAdded);
     readerWriter.close();
   }
 
@@ -187,6 +198,7 @@ public class FixedByteMVMutableForwardIndexTest {
     final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
     FixedByteMVMutableForwardIndex readerWriter =
         createReaderWriter(FieldSpec.DataType.LONG, r, rows, 
maxNumberOfMultiValuesPerRow, isDictionaryEncoded);
+    int valuesAdded = 0;
 
     long[][] data = new long[rows][];
     for (int i = 0; i < rows; i++) {
@@ -196,9 +208,11 @@ public class FixedByteMVMutableForwardIndexTest {
           data[i][j] = r.nextLong();
         }
         readerWriter.setLongMV(i, data[i]);
+        valuesAdded += data[i].length;
       } else {
         data[i] = new long[0];
         readerWriter.setLongMV(i, data[i]);
+        valuesAdded += data[i].length;
       }
     }
     long[] ret = new long[maxNumberOfMultiValuesPerRow];
@@ -207,6 +221,7 @@ public class FixedByteMVMutableForwardIndexTest {
       Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
       Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), 
"Failed with seed=" + seed);
     }
+    validateNumOfValues(readerWriter, valuesAdded);
     readerWriter.close();
   }
 
@@ -225,6 +240,7 @@ public class FixedByteMVMutableForwardIndexTest {
     final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
     FixedByteMVMutableForwardIndex readerWriter =
         createReaderWriter(FieldSpec.DataType.FLOAT, r, rows, 
maxNumberOfMultiValuesPerRow, isDictoinaryEncoded);
+    int valuesAdded = 0;
 
     float[][] data = new float[rows][];
     for (int i = 0; i < rows; i++) {
@@ -234,9 +250,11 @@ public class FixedByteMVMutableForwardIndexTest {
           data[i][j] = r.nextFloat();
         }
         readerWriter.setFloatMV(i, data[i]);
+        valuesAdded += data[i].length;
       } else {
         data[i] = new float[0];
         readerWriter.setFloatMV(i, data[i]);
+        valuesAdded += data[i].length;
       }
     }
     float[] ret = new float[maxNumberOfMultiValuesPerRow];
@@ -245,6 +263,7 @@ public class FixedByteMVMutableForwardIndexTest {
       Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
       Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), 
"Failed with seed=" + seed);
     }
+    validateNumOfValues(readerWriter, valuesAdded);
     readerWriter.close();
   }
 
@@ -263,6 +282,7 @@ public class FixedByteMVMutableForwardIndexTest {
     final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
     FixedByteMVMutableForwardIndex readerWriter =
         createReaderWriter(FieldSpec.DataType.DOUBLE, r, rows, 
maxNumberOfMultiValuesPerRow, isDictonaryEncoded);
+    int valuesAdded = 0;
 
     double[][] data = new double[rows][];
     for (int i = 0; i < rows; i++) {
@@ -272,9 +292,11 @@ public class FixedByteMVMutableForwardIndexTest {
           data[i][j] = r.nextDouble();
         }
         readerWriter.setDoubleMV(i, data[i]);
+        valuesAdded += data[i].length;
       } else {
         data[i] = new double[0];
         readerWriter.setDoubleMV(i, data[i]);
+        valuesAdded += data[i].length;
       }
     }
     double[] ret = new double[maxNumberOfMultiValuesPerRow];
@@ -283,6 +305,25 @@ public class FixedByteMVMutableForwardIndexTest {
       Assert.assertEquals(data[i].length, length, "Failed with seed=" + seed);
       Assert.assertTrue(Arrays.equals(data[i], Arrays.copyOf(ret, length)), 
"Failed with seed=" + seed);
     }
+    validateNumOfValues(readerWriter, valuesAdded);
     readerWriter.close();
   }
+
+  private int getNumValues(FixedByteMVMutableForwardIndex readerWriter)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field numValuesField = 
FixedByteMVMutableForwardIndex.class.getDeclaredField("_numValues");
+    numValuesField.setAccessible(true);
+    return (int) numValuesField.get(readerWriter);
+  }
+
+  private void validateNumOfValues(FixedByteMVMutableForwardIndex 
readerWriter, int valuesAdded) {
+    int numValuesPresentInIndex;
+    try {
+      numValuesPresentInIndex = getNumValues(readerWriter);
+    } catch (Exception e) {
+      throw new AssertionError("failed to validate the num of values added in 
the index");
+    }
+    Assert.assertEquals(numValuesPresentInIndex, valuesAdded);
+    Assert.assertTrue(readerWriter.canAddMore());
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java
index 1e0d827793..cadea9a1fc 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteSVMutableForwardIndexTest.java
@@ -112,6 +112,7 @@ public class FixedByteSVMutableForwardIndexTest {
     for (int i = 0; i < 2 * rows; i++) {
       Assert.assertEquals(readerWriter.getDictId(start + i), 0);
     }
+    Assert.assertTrue(readerWriter.canAddMore());
     readerWriter.close();
   }
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
index dc3bdc9869..494361947b 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableIndex.java
@@ -68,4 +68,11 @@ public interface MutableIndex extends IndexReader {
    */
   default void commit() {
   }
+
+  /**
+   * Returns a boolean denoting whether the mutable index can consume any more 
rows or not.
+   */
+  default boolean canAddMore() {
+    return true;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to