This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new af12ebff76 Fix bug is realtime segment completion for MV Raw columns 
(#10186)
af12ebff76 is described below

commit af12ebff762ad252b29050c1264e79eee11a1c72
Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com>
AuthorDate: Wed Feb 1 16:50:06 2023 -0800

    Fix bug is realtime segment completion for MV Raw columns (#10186)
    
    * Fix bug is realtime segment completion for MV, raw columns
    
    * Fix tests and checkstyle violation
    
    * Address review comments
---
 .../tests/BaseRealtimeClusterIntegrationTest.java  | 10 +--
 .../indexsegment/mutable/MutableSegmentImpl.java   | 80 +++++++++++++++++-----
 .../converter/stats/MutableColumnStatistics.java   |  5 ++
 .../stats/MutableNoDictionaryColStatistics.java    |  2 +-
 .../index/datasource/MutableDataSource.java        | 17 +++--
 .../segment/spi/datasource/DataSourceMetadata.java |  8 +++
 6 files changed, 93 insertions(+), 29 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
index a8cccc73b7..c95d5d7f2c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
@@ -90,12 +90,12 @@ public abstract class BaseRealtimeClusterIntegrationTest 
extends BaseClusterInte
 
   @Override
   protected List<String> getNoDictionaryColumns() {
+    List<String> noDictionaryColumns = new 
ArrayList<>(super.getNoDictionaryColumns());
     // Randomly set time column as no dictionary column.
-    if (new Random().nextInt(2) == 0) {
-      return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", 
"CRSDepTime", "DaysSinceEpoch");
-    } else {
-      return super.getNoDictionaryColumns();
+    if (new Random().nextBoolean()) {
+      noDictionaryColumns.add("DaysSinceEpoch");
     }
+    return noDictionaryColumns;
   }
 
   /**
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 0efa6395e5..f3e97443cb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -111,6 +111,7 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pinot.spi.data.FieldSpec.DataType.BYTES;
 import static org.apache.pinot.spi.data.FieldSpec.DataType.STRING;
 
@@ -386,7 +387,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
       // TODO: Support range index and bloom filter for mutable segment
       _indexContainerMap.put(column,
-          new IndexContainer(fieldSpec, partitionFunction, partitions, new 
NumValuesInfo(), forwardIndex, dictionary,
+          new IndexContainer(fieldSpec, partitionFunction, partitions, new 
ValuesInfo(), forwardIndex, dictionary,
               invertedIndexReader, null, textIndex, fstIndex, jsonIndex, 
h3Index, null, nullValueVector, sourceColumn,
               valueAggregator));
     }
@@ -601,7 +602,7 @@ public class MutableSegmentImpl implements MutableSegment {
         Object value = row.getValue(indexContainer._sourceColumn);
 
         // Update numValues info
-        indexContainer._numValuesInfo.updateSVEntry();
+        indexContainer._valuesInfo.updateSVNumValues();
 
         MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
         FieldSpec fieldSpec = indexContainer._fieldSpec;
@@ -661,7 +662,7 @@ public class MutableSegmentImpl implements MutableSegment {
         }
 
         // Update numValues info
-        indexContainer._numValuesInfo.updateSVEntry();
+        indexContainer._valuesInfo.updateSVNumValues();
 
         // Update indexes
         MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
@@ -769,10 +770,12 @@ public class MutableSegmentImpl implements MutableSegment 
{
 
         int[] dictIds = indexContainer._dictIds;
 
+        indexContainer._valuesInfo.updateVarByteMVMaxRowLengthInBytes(value, 
dataType.getStoredType());
+
         if (dictIds != null) {
           // Dictionary encoded
           // Update numValues info
-          indexContainer._numValuesInfo.updateMVEntry(dictIds.length);
+          indexContainer._valuesInfo.updateMVNumValues(dictIds.length);
 
           // Update forward index
           indexContainer._forwardIndex.setDictIdMV(docId, dictIds);
@@ -799,7 +802,7 @@ public class MutableSegmentImpl implements MutableSegment {
                 intValues[i] = (Integer) values[i];
               }
               indexContainer._forwardIndex.setIntMV(docId, intValues);
-              indexContainer._numValuesInfo.updateMVEntry(intValues.length);
+              indexContainer._valuesInfo.updateMVNumValues(intValues.length);
               break;
             case LONG:
               values = (Object[]) value;
@@ -808,7 +811,7 @@ public class MutableSegmentImpl implements MutableSegment {
                 longValues[i] = (Long) values[i];
               }
               indexContainer._forwardIndex.setLongMV(docId, longValues);
-              indexContainer._numValuesInfo.updateMVEntry(longValues.length);
+              indexContainer._valuesInfo.updateMVNumValues(longValues.length);
               break;
             case FLOAT:
               values = (Object[]) value;
@@ -817,7 +820,7 @@ public class MutableSegmentImpl implements MutableSegment {
                 floatValues[i] = (Float) values[i];
               }
               indexContainer._forwardIndex.setFloatMV(docId, floatValues);
-              indexContainer._numValuesInfo.updateMVEntry(floatValues.length);
+              indexContainer._valuesInfo.updateMVNumValues(floatValues.length);
               break;
             case DOUBLE:
               values = (Object[]) value;
@@ -826,7 +829,7 @@ public class MutableSegmentImpl implements MutableSegment {
                 doubleValues[i] = (Double) values[i];
               }
               indexContainer._forwardIndex.setDoubleMV(docId, doubleValues);
-              indexContainer._numValuesInfo.updateMVEntry(doubleValues.length);
+              
indexContainer._valuesInfo.updateMVNumValues(doubleValues.length);
               break;
             default:
               throw new UnsupportedOperationException(
@@ -1243,18 +1246,60 @@ public class MutableSegmentImpl implements 
MutableSegment {
 
   // NOTE: Okay for single-writer
   @SuppressWarnings("NonAtomicOperationOnVolatileField")
-  private static class NumValuesInfo {
+  private static class ValuesInfo {
     volatile int _numValues = 0;
     volatile int _maxNumValuesPerMVEntry = -1;
+    volatile int _varByteMVMaxRowLengthInBytes = -1;
 
-    void updateSVEntry() {
+    void updateSVNumValues() {
       _numValues++;
     }
 
-    void updateMVEntry(int numValuesInMVEntry) {
+    void updateMVNumValues(int numValuesInMVEntry) {
       _numValues += numValuesInMVEntry;
       _maxNumValuesPerMVEntry = Math.max(_maxNumValuesPerMVEntry, 
numValuesInMVEntry);
     }
+
+    /**
+     * When an MV VarByte column is created with noDict, the realtime segment 
is still created with a dictionary.
+     * When the realtime segment is converted to offline segment, the offline 
segment creates a noDict column.
+     * MultiValueVarByteRawIndexCreator requires the maxRowLengthInBytes. 
Refer to OSS issue
+     * https://github.com/apache/pinot/issues/10127 for more details.
+     */
+    void updateVarByteMVMaxRowLengthInBytes(Object entry, DataType dataType) {
+      // MV support for BigDecimal is not available.
+      if (dataType != STRING && dataType != BYTES) {
+        return;
+      }
+
+      Object[] values = (Object[]) entry;
+      int rowLength = 0;
+
+      switch (dataType) {
+        case STRING: {
+          for (Object obj : values) {
+            String value = (String) obj;
+            int length = value.getBytes(UTF_8).length;
+            rowLength += length;
+          }
+
+          _varByteMVMaxRowLengthInBytes = 
Math.max(_varByteMVMaxRowLengthInBytes, rowLength);
+          break;
+        }
+        case BYTES: {
+          for (Object obj : values) {
+            ByteArray value = new ByteArray((byte[]) obj);
+            int length = value.length();
+            rowLength += length;
+          }
+
+          _varByteMVMaxRowLengthInBytes = 
Math.max(_varByteMVMaxRowLengthInBytes, rowLength);
+          break;
+        }
+        default:
+          throw new IllegalStateException("Invalid type=" + dataType);
+      }
+    }
   }
 
   private static Map<String, Pair<String, ValueAggregator>> 
getMetricsAggregators(RealtimeSegmentConfig segmentConfig) {
@@ -1311,7 +1356,7 @@ public class MutableSegmentImpl implements MutableSegment 
{
     final FieldSpec _fieldSpec;
     final PartitionFunction _partitionFunction;
     final Set<Integer> _partitions;
-    final NumValuesInfo _numValuesInfo;
+    final ValuesInfo _valuesInfo;
     final MutableForwardIndex _forwardIndex;
     final MutableDictionary _dictionary;
     final MutableInvertedIndex _invertedIndex;
@@ -1328,12 +1373,13 @@ public class MutableSegmentImpl implements 
MutableSegment {
     volatile Comparable _minValue;
     volatile Comparable _maxValue;
 
+
     // Hold the dictionary id for the latest record
     int _dictId = Integer.MIN_VALUE;
     int[] _dictIds;
 
     IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction 
partitionFunction,
-        @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, 
MutableForwardIndex forwardIndex,
+        @Nullable Set<Integer> partitions, ValuesInfo valuesInfo, 
MutableForwardIndex forwardIndex,
         @Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex 
invertedIndex,
         @Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex 
textIndex,
         @Nullable MutableTextIndex fstIndex, @Nullable MutableJsonIndex 
jsonIndex, @Nullable MutableH3Index h3Index,
@@ -1342,7 +1388,7 @@ public class MutableSegmentImpl implements MutableSegment 
{
       _fieldSpec = fieldSpec;
       _partitionFunction = partitionFunction;
       _partitions = partitions;
-      _numValuesInfo = numValuesInfo;
+      _valuesInfo = valuesInfo;
       _forwardIndex = forwardIndex;
       _dictionary = dictionary;
       _invertedIndex = invertedIndex;
@@ -1359,10 +1405,10 @@ public class MutableSegmentImpl implements 
MutableSegment {
     }
 
     DataSource toDataSource() {
-      return new MutableDataSource(_fieldSpec, _numDocsIndexed, 
_numValuesInfo._numValues,
-          _numValuesInfo._maxNumValuesPerMVEntry, _dictionary == null ? -1 : 
_dictionary.length(), _partitionFunction,
+      return new MutableDataSource(_fieldSpec, _numDocsIndexed, 
_valuesInfo._numValues,
+          _valuesInfo._maxNumValuesPerMVEntry, _dictionary == null ? -1 : 
_dictionary.length(), _partitionFunction,
           _partitions, _minValue, _maxValue, _forwardIndex, _dictionary, 
_invertedIndex, _rangeIndex, _textIndex,
-          _fstIndex, _jsonIndex, _h3Index, _bloomFilter, _nullValueVector);
+          _fstIndex, _jsonIndex, _h3Index, _bloomFilter, _nullValueVector, 
_valuesInfo._varByteMVMaxRowLengthInBytes);
     }
 
     @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
index cff6260b99..d41d55b37d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
@@ -188,6 +188,11 @@ public class MutableColumnStatistics implements 
ColumnStatistics {
     return _dataSource.getDataSourceMetadata().getPartitionFunction();
   }
 
+  @Override
+  public int getMaxRowLengthInBytes() {
+    return _dataSource.getDataSourceMetadata().getMaxRowLengthInBytes();
+  }
+
   @Override
   public int getNumPartitions() {
     PartitionFunction partitionFunction = 
_dataSource.getDataSourceMetadata().getPartitionFunction();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
index c4a3c910b0..0cf4c37b96 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
@@ -83,7 +83,7 @@ public class MutableNoDictionaryColStatistics implements 
ColumnStatistics {
 
   @Override
   public int getMaxNumberOfMultiValues() {
-    return 0;
+    return _dataSourceMetadata.getMaxNumValuesPerMVEntry();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
index 90bbb3b773..f09d8d3d42 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
@@ -46,11 +46,10 @@ public class MutableDataSource extends BaseDataSource {
       @Nullable InvertedIndexReader invertedIndex, @Nullable RangeIndexReader 
rangeIndex,
       @Nullable TextIndexReader textIndex, @Nullable TextIndexReader fstIndex, 
@Nullable JsonIndexReader jsonIndex,
       @Nullable H3IndexReader h3Index, @Nullable BloomFilterReader bloomFilter,
-      @Nullable NullValueVectorReader nullValueVector) {
+      @Nullable NullValueVectorReader nullValueVector, int 
maxRowLengthInBytes) {
     super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, 
maxNumValuesPerMVEntry, cardinality,
-            partitionFunction,
-            partitions, minValue, maxValue), forwardIndex, dictionary, 
invertedIndex, rangeIndex, textIndex, fstIndex,
-        jsonIndex, h3Index, bloomFilter, nullValueVector);
+            partitionFunction, partitions, minValue, maxValue, 
maxRowLengthInBytes), forwardIndex, dictionary,
+        invertedIndex, rangeIndex, textIndex, fstIndex, jsonIndex, h3Index, 
bloomFilter, nullValueVector);
   }
 
   private static class MutableDataSourceMetadata implements DataSourceMetadata 
{
@@ -63,10 +62,11 @@ public class MutableDataSource extends BaseDataSource {
     final Set<Integer> _partitions;
     final Comparable _minValue;
     final Comparable _maxValue;
+    final int _maxRowLengthInBytes;
 
     MutableDataSourceMetadata(FieldSpec fieldSpec, int numDocs, int numValues, 
int maxNumValuesPerMVEntry,
         int cardinality, @Nullable PartitionFunction partitionFunction, 
@Nullable Set<Integer> partitions,
-        @Nullable Comparable minValue, @Nullable Comparable maxValue) {
+        @Nullable Comparable minValue, @Nullable Comparable maxValue, int 
maxRowLengthInBytes) {
       _fieldSpec = fieldSpec;
       _numDocs = numDocs;
       _numValues = numValues;
@@ -81,6 +81,7 @@ public class MutableDataSource extends BaseDataSource {
       _minValue = minValue;
       _maxValue = maxValue;
       _cardinality = cardinality;
+      _maxRowLengthInBytes = maxRowLengthInBytes;
     }
 
     @Override
@@ -115,7 +116,6 @@ public class MutableDataSource extends BaseDataSource {
       return _minValue;
     }
 
-    @Nullable
     @Override
     public Comparable getMaxValue() {
       return _maxValue;
@@ -137,5 +137,10 @@ public class MutableDataSource extends BaseDataSource {
     public int getCardinality() {
       return _cardinality;
     }
+
+    @Override
+    public int getMaxRowLengthInBytes() {
+      return _maxRowLengthInBytes;
+    }
   }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
index 494129f0a1..3f14601d32 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java
@@ -100,4 +100,12 @@ public interface DataSourceMetadata {
    * Returns the cardinality of the column, {@code -1} if not applicable
    */
   int getCardinality();
+
+  /**
+   * Returns the max row length in bytes for a var byte MV column. {@code -1} 
if not applicable.
+   * @return
+   */
+  default int getMaxRowLengthInBytes() {
+    return -1;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to