This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new af12ebff76 Fix bug is realtime segment completion for MV Raw columns (#10186) af12ebff76 is described below commit af12ebff762ad252b29050c1264e79eee11a1c72 Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com> AuthorDate: Wed Feb 1 16:50:06 2023 -0800 Fix bug is realtime segment completion for MV Raw columns (#10186) * Fix bug is realtime segment completion for MV, raw columns * Fix tests and checkstyle violation * Address review comments --- .../tests/BaseRealtimeClusterIntegrationTest.java | 10 +-- .../indexsegment/mutable/MutableSegmentImpl.java | 80 +++++++++++++++++----- .../converter/stats/MutableColumnStatistics.java | 5 ++ .../stats/MutableNoDictionaryColStatistics.java | 2 +- .../index/datasource/MutableDataSource.java | 17 +++-- .../segment/spi/datasource/DataSourceMetadata.java | 8 +++ 6 files changed, 93 insertions(+), 29 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java index a8cccc73b7..c95d5d7f2c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java @@ -19,7 +19,7 @@ package org.apache.pinot.integration.tests; import java.io.File; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.commons.io.FileUtils; @@ -90,12 +90,12 @@ public abstract class BaseRealtimeClusterIntegrationTest extends BaseClusterInte @Override protected List<String> getNoDictionaryColumns() { + List<String> noDictionaryColumns = new ArrayList<>(super.getNoDictionaryColumns()); // Randomly set time column as no dictionary column. - if (new Random().nextInt(2) == 0) { - return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime", "DaysSinceEpoch"); - } else { - return super.getNoDictionaryColumns(); + if (new Random().nextBoolean()) { + noDictionaryColumns.add("DaysSinceEpoch"); } + return noDictionaryColumns; } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index 0efa6395e5..f3e97443cb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -111,6 +111,7 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pinot.spi.data.FieldSpec.DataType.BYTES; import static org.apache.pinot.spi.data.FieldSpec.DataType.STRING; @@ -386,7 +387,7 @@ public class MutableSegmentImpl implements MutableSegment { // TODO: Support range index and bloom filter for mutable segment _indexContainerMap.put(column, - new IndexContainer(fieldSpec, partitionFunction, partitions, new NumValuesInfo(), forwardIndex, dictionary, + new IndexContainer(fieldSpec, partitionFunction, partitions, new ValuesInfo(), forwardIndex, dictionary, invertedIndexReader, null, textIndex, fstIndex, jsonIndex, h3Index, null, nullValueVector, sourceColumn, valueAggregator)); } @@ -601,7 +602,7 @@ public class MutableSegmentImpl implements MutableSegment { Object value = row.getValue(indexContainer._sourceColumn); // Update numValues info - indexContainer._numValuesInfo.updateSVEntry(); + indexContainer._valuesInfo.updateSVNumValues(); MutableForwardIndex forwardIndex = indexContainer._forwardIndex; FieldSpec fieldSpec = indexContainer._fieldSpec; @@ -661,7 +662,7 @@ public class MutableSegmentImpl implements MutableSegment { } // Update numValues info - indexContainer._numValuesInfo.updateSVEntry(); + indexContainer._valuesInfo.updateSVNumValues(); // Update indexes MutableForwardIndex forwardIndex = indexContainer._forwardIndex; @@ -769,10 +770,12 @@ public class MutableSegmentImpl implements MutableSegment { int[] dictIds = indexContainer._dictIds; + indexContainer._valuesInfo.updateVarByteMVMaxRowLengthInBytes(value, dataType.getStoredType()); + if (dictIds != null) { // Dictionary encoded // Update numValues info - indexContainer._numValuesInfo.updateMVEntry(dictIds.length); + indexContainer._valuesInfo.updateMVNumValues(dictIds.length); // Update forward index indexContainer._forwardIndex.setDictIdMV(docId, dictIds); @@ -799,7 +802,7 @@ public class MutableSegmentImpl implements MutableSegment { intValues[i] = (Integer) values[i]; } indexContainer._forwardIndex.setIntMV(docId, intValues); - indexContainer._numValuesInfo.updateMVEntry(intValues.length); + indexContainer._valuesInfo.updateMVNumValues(intValues.length); break; case LONG: values = (Object[]) value; @@ -808,7 +811,7 @@ public class MutableSegmentImpl implements MutableSegment { longValues[i] = (Long) values[i]; } indexContainer._forwardIndex.setLongMV(docId, longValues); - indexContainer._numValuesInfo.updateMVEntry(longValues.length); + indexContainer._valuesInfo.updateMVNumValues(longValues.length); break; case FLOAT: values = (Object[]) value; @@ -817,7 +820,7 @@ public class MutableSegmentImpl implements MutableSegment { floatValues[i] = (Float) values[i]; } indexContainer._forwardIndex.setFloatMV(docId, floatValues); - indexContainer._numValuesInfo.updateMVEntry(floatValues.length); + indexContainer._valuesInfo.updateMVNumValues(floatValues.length); break; case DOUBLE: values = (Object[]) value; @@ -826,7 +829,7 @@ public class MutableSegmentImpl implements MutableSegment { doubleValues[i] = (Double) values[i]; } indexContainer._forwardIndex.setDoubleMV(docId, doubleValues); - indexContainer._numValuesInfo.updateMVEntry(doubleValues.length); + indexContainer._valuesInfo.updateMVNumValues(doubleValues.length); break; default: throw new UnsupportedOperationException( @@ -1243,18 +1246,60 @@ public class MutableSegmentImpl implements MutableSegment { // NOTE: Okay for single-writer @SuppressWarnings("NonAtomicOperationOnVolatileField") - private static class NumValuesInfo { + private static class ValuesInfo { volatile int _numValues = 0; volatile int _maxNumValuesPerMVEntry = -1; + volatile int _varByteMVMaxRowLengthInBytes = -1; - void updateSVEntry() { + void updateSVNumValues() { _numValues++; } - void updateMVEntry(int numValuesInMVEntry) { + void updateMVNumValues(int numValuesInMVEntry) { _numValues += numValuesInMVEntry; _maxNumValuesPerMVEntry = Math.max(_maxNumValuesPerMVEntry, numValuesInMVEntry); } + + /** + * When an MV VarByte column is created with noDict, the realtime segment is still created with a dictionary. + * When the realtime segment is converted to offline segment, the offline segment creates a noDict column. + * MultiValueVarByteRawIndexCreator requires the maxRowLengthInBytes. Refer to OSS issue + * https://github.com/apache/pinot/issues/10127 for more details. + */ + void updateVarByteMVMaxRowLengthInBytes(Object entry, DataType dataType) { + // MV support for BigDecimal is not available. + if (dataType != STRING && dataType != BYTES) { + return; + } + + Object[] values = (Object[]) entry; + int rowLength = 0; + + switch (dataType) { + case STRING: { + for (Object obj : values) { + String value = (String) obj; + int length = value.getBytes(UTF_8).length; + rowLength += length; + } + + _varByteMVMaxRowLengthInBytes = Math.max(_varByteMVMaxRowLengthInBytes, rowLength); + break; + } + case BYTES: { + for (Object obj : values) { + ByteArray value = new ByteArray((byte[]) obj); + int length = value.length(); + rowLength += length; + } + + _varByteMVMaxRowLengthInBytes = Math.max(_varByteMVMaxRowLengthInBytes, rowLength); + break; + } + default: + throw new IllegalStateException("Invalid type=" + dataType); + } + } } private static Map<String, Pair<String, ValueAggregator>> getMetricsAggregators(RealtimeSegmentConfig segmentConfig) { @@ -1311,7 +1356,7 @@ public class MutableSegmentImpl implements MutableSegment { final FieldSpec _fieldSpec; final PartitionFunction _partitionFunction; final Set<Integer> _partitions; - final NumValuesInfo _numValuesInfo; + final ValuesInfo _valuesInfo; final MutableForwardIndex _forwardIndex; final MutableDictionary _dictionary; final MutableInvertedIndex _invertedIndex; @@ -1328,12 +1373,13 @@ public class MutableSegmentImpl implements MutableSegment { volatile Comparable _minValue; volatile Comparable _maxValue; + // Hold the dictionary id for the latest record int _dictId = Integer.MIN_VALUE; int[] _dictIds; IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction partitionFunction, - @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex, + @Nullable Set<Integer> partitions, ValuesInfo valuesInfo, MutableForwardIndex forwardIndex, @Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex invertedIndex, @Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex textIndex, @Nullable MutableTextIndex fstIndex, @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index h3Index, @@ -1342,7 +1388,7 @@ public class MutableSegmentImpl implements MutableSegment { _fieldSpec = fieldSpec; _partitionFunction = partitionFunction; _partitions = partitions; - _numValuesInfo = numValuesInfo; + _valuesInfo = valuesInfo; _forwardIndex = forwardIndex; _dictionary = dictionary; _invertedIndex = invertedIndex; @@ -1359,10 +1405,10 @@ public class MutableSegmentImpl implements MutableSegment { } DataSource toDataSource() { - return new MutableDataSource(_fieldSpec, _numDocsIndexed, _numValuesInfo._numValues, - _numValuesInfo._maxNumValuesPerMVEntry, _dictionary == null ? -1 : _dictionary.length(), _partitionFunction, + return new MutableDataSource(_fieldSpec, _numDocsIndexed, _valuesInfo._numValues, + _valuesInfo._maxNumValuesPerMVEntry, _dictionary == null ? -1 : _dictionary.length(), _partitionFunction, _partitions, _minValue, _maxValue, _forwardIndex, _dictionary, _invertedIndex, _rangeIndex, _textIndex, - _fstIndex, _jsonIndex, _h3Index, _bloomFilter, _nullValueVector); + _fstIndex, _jsonIndex, _h3Index, _bloomFilter, _nullValueVector, _valuesInfo._varByteMVMaxRowLengthInBytes); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java index cff6260b99..d41d55b37d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java @@ -188,6 +188,11 @@ public class MutableColumnStatistics implements ColumnStatistics { return _dataSource.getDataSourceMetadata().getPartitionFunction(); } + @Override + public int getMaxRowLengthInBytes() { + return _dataSource.getDataSourceMetadata().getMaxRowLengthInBytes(); + } + @Override public int getNumPartitions() { PartitionFunction partitionFunction = _dataSource.getDataSourceMetadata().getPartitionFunction(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java index c4a3c910b0..0cf4c37b96 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java @@ -83,7 +83,7 @@ public class MutableNoDictionaryColStatistics implements ColumnStatistics { @Override public int getMaxNumberOfMultiValues() { - return 0; + return _dataSourceMetadata.getMaxNumValuesPerMVEntry(); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java index 90bbb3b773..f09d8d3d42 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java @@ -46,11 +46,10 @@ public class MutableDataSource extends BaseDataSource { @Nullable InvertedIndexReader invertedIndex, @Nullable RangeIndexReader rangeIndex, @Nullable TextIndexReader textIndex, @Nullable TextIndexReader fstIndex, @Nullable JsonIndexReader jsonIndex, @Nullable H3IndexReader h3Index, @Nullable BloomFilterReader bloomFilter, - @Nullable NullValueVectorReader nullValueVector) { + @Nullable NullValueVectorReader nullValueVector, int maxRowLengthInBytes) { super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, cardinality, - partitionFunction, - partitions, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, textIndex, fstIndex, - jsonIndex, h3Index, bloomFilter, nullValueVector); + partitionFunction, partitions, minValue, maxValue, maxRowLengthInBytes), forwardIndex, dictionary, + invertedIndex, rangeIndex, textIndex, fstIndex, jsonIndex, h3Index, bloomFilter, nullValueVector); } private static class MutableDataSourceMetadata implements DataSourceMetadata { @@ -63,10 +62,11 @@ public class MutableDataSource extends BaseDataSource { final Set<Integer> _partitions; final Comparable _minValue; final Comparable _maxValue; + final int _maxRowLengthInBytes; MutableDataSourceMetadata(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry, int cardinality, @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions, - @Nullable Comparable minValue, @Nullable Comparable maxValue) { + @Nullable Comparable minValue, @Nullable Comparable maxValue, int maxRowLengthInBytes) { _fieldSpec = fieldSpec; _numDocs = numDocs; _numValues = numValues; @@ -81,6 +81,7 @@ public class MutableDataSource extends BaseDataSource { _minValue = minValue; _maxValue = maxValue; _cardinality = cardinality; + _maxRowLengthInBytes = maxRowLengthInBytes; } @Override @@ -115,7 +116,6 @@ public class MutableDataSource extends BaseDataSource { return _minValue; } - @Nullable @Override public Comparable getMaxValue() { return _maxValue; @@ -137,5 +137,10 @@ public class MutableDataSource extends BaseDataSource { public int getCardinality() { return _cardinality; } + + @Override + public int getMaxRowLengthInBytes() { + return _maxRowLengthInBytes; + } } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java index 494129f0a1..3f14601d32 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSourceMetadata.java @@ -100,4 +100,12 @@ public interface DataSourceMetadata { * Returns the cardinality of the column, {@code -1} if not applicable */ int getCardinality(); + + /** + * Returns the max row length in bytes for a var byte MV column. {@code -1} if not applicable. + * @return + */ + default int getMaxRowLengthInBytes() { + return -1; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org