klsince commented on code in PR #16344:
URL: https://github.com/apache/pinot/pull/16344#discussion_r2283178760
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -96,23 +97,61 @@ public void build(@Nullable SegmentVersion segmentVersion,
@Nullable ServerMetri
_realtimeSegmentImpl.commit();
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader()) {
- String sortedColumn = null;
- List<String> columnSortOrder = genConfig.getColumnSortOrder();
- if (CollectionUtils.isNotEmpty(columnSortOrder)) {
- sortedColumn = columnSortOrder.get(0);
+
+ // Check if commit-time compaction is enabled for upsert tables
+ boolean useCompactedReader =
TableConfigUtils.isCommitTimeCompactionEnabled(_tableConfig);
+
+ String sortedColumn = null;
+ List<String> columnSortOrder = genConfig.getColumnSortOrder();
+ if (columnSortOrder != null && !columnSortOrder.isEmpty()) {
+ sortedColumn = columnSortOrder.get(0);
+ }
+ int[] sortedDocIds =
+ sortedColumn != null ?
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn)
: null;
+
+ if (useCompactedReader) {
+ // Collect metrics for commit-time compaction
+ long compactionStartTime = System.currentTimeMillis();
+ int preCompactionRowCount = _realtimeSegmentImpl.getNumDocsIndexed();
+ // Track that commit-time compaction is enabled for this segment
+ if (serverMetrics != null) {
Review Comment:
I see you've removed 'nullable' annotation for serverMetrics, so is this
null-check still needed? or the 'nullable' should be kept instead?
consider to emit the two metrics inside publishCompactionMetrics() too? may
need to wrap the method call inside a finally block.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -96,23 +97,61 @@ public void build(@Nullable SegmentVersion segmentVersion,
@Nullable ServerMetri
_realtimeSegmentImpl.commit();
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader()) {
- String sortedColumn = null;
- List<String> columnSortOrder = genConfig.getColumnSortOrder();
- if (CollectionUtils.isNotEmpty(columnSortOrder)) {
- sortedColumn = columnSortOrder.get(0);
+
+ // Check if commit-time compaction is enabled for upsert tables
+ boolean useCompactedReader =
TableConfigUtils.isCommitTimeCompactionEnabled(_tableConfig);
+
+ String sortedColumn = null;
+ List<String> columnSortOrder = genConfig.getColumnSortOrder();
+ if (columnSortOrder != null && !columnSortOrder.isEmpty()) {
+ sortedColumn = columnSortOrder.get(0);
+ }
+ int[] sortedDocIds =
+ sortedColumn != null ?
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn)
: null;
+
+ if (useCompactedReader) {
+ // Collect metrics for commit-time compaction
+ long compactionStartTime = System.currentTimeMillis();
+ int preCompactionRowCount = _realtimeSegmentImpl.getNumDocsIndexed();
+ // Track that commit-time compaction is enabled for this segment
+ if (serverMetrics != null) {
+ serverMetrics.addMeteredTableValue(_tableName,
ServerMeter.COMMIT_TIME_COMPACTION_ENABLED_SEGMENTS, 1L);
+ serverMetrics.addMeteredTableValue(_tableName,
ServerMeter.COMMIT_TIME_COMPACTION_ROWS_PRE_COMPACTION,
+ preCompactionRowCount);
+ }
+
+ // Use CompactedPinotSegmentRecordReader to remove obsolete records
+ try (CompactedPinotSegmentRecordReader recordReader = new
CompactedPinotSegmentRecordReader(
+ _realtimeSegmentImpl.getValidDocIds(),
_realtimeSegmentImpl.getDeleteRecordColumn())) {
+ recordReader.init(_realtimeSegmentImpl, sortedDocIds);
+ RealtimeSegmentSegmentCreationDataSource dataSource =
+ new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl,
recordReader, sortedDocIds);
+ driver.init(genConfig, dataSource,
TransformPipeline.getPassThroughPipeline(_tableName)); // initializes reader
+
+ if (!_enableColumnMajor) {
+ driver.build();
+ } else {
+ driver.buildByColumn(_realtimeSegmentImpl);
+ }
}
- int[] sortedDocIds =
- sortedColumn != null ?
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn)
: null;
- recordReader.init(_realtimeSegmentImpl, sortedDocIds);
- RealtimeSegmentSegmentCreationDataSource dataSource =
- new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl,
recordReader);
- driver.init(genConfig, dataSource,
TransformPipeline.getPassThroughPipeline(_tableName)); // initializes reader
-
- if (!_enableColumnMajor) {
- driver.build();
- } else {
- driver.buildByColumn(_realtimeSegmentImpl);
+
+ // Collect and publish post-compaction metrics
+ if (serverMetrics != null) {
+ publishCompactionMetrics(serverMetrics, preCompactionRowCount, driver,
compactionStartTime);
+ }
+ } else {
+ // Use regular PinotSegmentRecordReader (existing behavior)
+ try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader()) {
Review Comment:
the major difference of the two if-else branches is how recordReader is
created, so might consider to reuse the logic after reader creation.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/CompactedRawIndexDictColumnStatistics.java:
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.converter.stats;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+
+
+/**
+ * Column statistics for dictionary columns with raw (non-dictionary-encoded)
forward indexes.
+ * Reads raw values from forward index and maps them to dictionary IDs.
+ *
+ * This is used when:
+ * - Column has a dictionary (dataSource.getDictionary() != null)
+ * - Forward index is NOT dictionary-encoded
(forwardIndex.isDictionaryEncoded() == false)
Review Comment:
for my learning, was this particular combination handled before?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/CompactedRawIndexDictColumnStatistics.java:
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.converter.stats;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+
+
+/**
+ * Column statistics for dictionary columns with raw (non-dictionary-encoded)
forward indexes.
+ * Reads raw values from forward index and maps them to dictionary IDs.
+ *
+ * This is used when:
+ * - Column has a dictionary (dataSource.getDictionary() != null)
+ * - Forward index is NOT dictionary-encoded
(forwardIndex.isDictionaryEncoded() == false)
+ * - Commit-time compaction is enabled
+ *
+ * Common scenarios:
+ * - Multi-value columns where forward index stores raw values but dictionary
exists for other operations
+ * - Variable-length string columns optimized for sequential access
+ * - Consuming segments where dictionary is built separately from forward index
+ */
+public class CompactedRawIndexDictColumnStatistics extends
MutableColumnStatistics {
+ private final Set<Integer> _usedDictIds;
+ private final int _compactedCardinality;
+ private final Object _compactedUniqueValues;
+
+ public CompactedRawIndexDictColumnStatistics(DataSource dataSource, int[]
sortedDocIds,
+ ThreadSafeMutableRoaringBitmap validDocIds) {
+ super(dataSource, sortedDocIds);
+
+ String columnName =
dataSource.getDataSourceMetadata().getFieldSpec().getName();
+ Dictionary dictionary = dataSource.getDictionary();
+ MutableForwardIndex forwardIndex = (MutableForwardIndex)
dataSource.getForwardIndex();
+
+ // Since forward index is not dictionary-encoded, we need to read raw
values and map them to dictionary IDs
+ _usedDictIds = new HashSet<>();
+ int[] validDocIdsArray = validDocIds.getMutableRoaringBitmap().toArray();
Review Comment:
use iterator of the bitmap instead of creating an array copy?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java:
##########
@@ -38,28 +36,59 @@
public class RealtimeSegmentStatsContainer implements
SegmentPreIndexStatsContainer {
private final MutableSegment _mutableSegment;
private final Map<String, ColumnStatistics> _columnStatisticsMap = new
HashMap<>();
+ private final int _totalDocCount;
- public RealtimeSegmentStatsContainer(MutableSegment mutableSegment,
@Nullable int[] sortedDocIds,
+ public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, int[]
sortedDocIds,
StatsCollectorConfig statsCollectorConfig) {
+ this(mutableSegment, sortedDocIds, statsCollectorConfig, null);
+ }
+
+ public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, int[]
sortedDocIds,
+ StatsCollectorConfig statsCollectorConfig, RecordReader recordReader) {
_mutableSegment = mutableSegment;
+ // Determine if we're using compacted reader
+ boolean isUsingCompactedReader = recordReader instanceof
CompactedPinotSegmentRecordReader;
+
+ // Determine the correct total document count based on whether compaction
is being used
+ if (isUsingCompactedReader) {
Review Comment:
nit:
```
if (isUsingCompactedReader && validDocids != null)
_totalDocCount =
mutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality()
else
_totalDocCount = mutableSegment.getNumDocsIndexed();
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java:
##########
@@ -38,28 +36,59 @@
public class RealtimeSegmentStatsContainer implements
SegmentPreIndexStatsContainer {
private final MutableSegment _mutableSegment;
private final Map<String, ColumnStatistics> _columnStatisticsMap = new
HashMap<>();
+ private final int _totalDocCount;
- public RealtimeSegmentStatsContainer(MutableSegment mutableSegment,
@Nullable int[] sortedDocIds,
+ public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, int[]
sortedDocIds,
StatsCollectorConfig statsCollectorConfig) {
+ this(mutableSegment, sortedDocIds, statsCollectorConfig, null);
+ }
+
+ public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, int[]
sortedDocIds,
+ StatsCollectorConfig statsCollectorConfig, RecordReader recordReader) {
_mutableSegment = mutableSegment;
+ // Determine if we're using compacted reader
+ boolean isUsingCompactedReader = recordReader instanceof
CompactedPinotSegmentRecordReader;
+
+ // Determine the correct total document count based on whether compaction
is being used
+ if (isUsingCompactedReader) {
+ // When using CompactedPinotSegmentRecordReader, use the valid document
count
+ if (mutableSegment.getValidDocIds() != null) {
+ _totalDocCount =
mutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+ } else {
+ _totalDocCount = mutableSegment.getNumDocsIndexed();
+ }
+ } else {
+ // Use the original total document count for non-compacted readers
+ _totalDocCount = mutableSegment.getNumDocsIndexed();
+ }
+
// Create all column statistics
+ // Determine compaction mode once for all columns
+ boolean useCompactedStatistics = isUsingCompactedReader &&
mutableSegment.getValidDocIds() != null;
+ ThreadSafeMutableRoaringBitmap validDocIds = useCompactedStatistics ?
mutableSegment.getValidDocIds() : null;
+
for (String columnName : mutableSegment.getPhysicalColumnNames()) {
DataSource dataSource = mutableSegment.getDataSource(columnName);
- if (dataSource instanceof MutableMapDataSource) {
- ForwardIndexReader reader = dataSource.getForwardIndex();
- MapColumnPreIndexStatsCollector mapColumnPreIndexStatsCollector =
- new MapColumnPreIndexStatsCollector(dataSource.getColumnName(),
statsCollectorConfig);
- int numDocs = dataSource.getDataSourceMetadata().getNumDocs();
- ForwardIndexReaderContext readerContext = reader.createContext();
- for (int row = 0; row < numDocs; row++) {
- mapColumnPreIndexStatsCollector.collect(reader.getMap(row,
readerContext));
+
+ if (dataSource.getDictionary() != null) {
+ // Dictionary columns
+ if (useCompactedStatistics) {
Review Comment:
nit: make a wrap class e.g. MutableCompactedColumnStatistics that handles
this dict-enabled check and calls the corresponding classes
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/CompactedDictEncodedColumnStatistics.java:
##########
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.converter.stats;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Column statistics for dictionary columns with dictionary-encoded forward
indexes.
+ * Uses direct getDictId() calls for single-value columns and getDictIdMV()
calls for multi-value columns
+ * to find which dictionary entries are used by valid documents.
+ *
+ * This is used when:
+ * - Column has a dictionary (dataSource.getDictionary() != null)
+ * - Forward index is dictionary-encoded (forwardIndex.isDictionaryEncoded()
== true)
+ * - Commit-time compaction is enabled
+ */
+public class CompactedDictEncodedColumnStatistics extends
MutableColumnStatistics {
+ private final Set<Integer> _usedDictIds;
+ private final int _compactedCardinality;
+ private final DataSource _dataSource;
+ private final Object _compactedUniqueValues;
+ private int _maxNumberOfMultiValues = 1; // Track max multi-values for
buffer allocation
+ private int _totalNumberOfEntries = 0; // Total number of entries in the
column
+ private Object _minValue; // Track min value from valid documents
+ private Object _maxValue; // Track max value from valid documents
+
+ public CompactedDictEncodedColumnStatistics(DataSource dataSource, int[]
sortedDocIds,
+ ThreadSafeMutableRoaringBitmap validDocIds) {
+ super(dataSource, sortedDocIds);
+ _dataSource = dataSource;
+
+ String columnName =
dataSource.getDataSourceMetadata().getFieldSpec().getName();
+
+ // Find which dictionary IDs are actually used by valid documents
+ _usedDictIds = new HashSet<>();
+ MutableForwardIndex forwardIndex = (MutableForwardIndex)
dataSource.getForwardIndex();
+ Dictionary dictionary = dataSource.getDictionary();
+
+ // Iterate through valid document IDs
+ int[] validDocIdsArray = validDocIds.getMutableRoaringBitmap().toArray();
+ boolean isSingleValue = forwardIndex.isSingleValue();
+
+ for (int docId : validDocIdsArray) {
+ if (isSingleValue) {
+ // Single-value column: use getDictId()
+ int dictId = forwardIndex.getDictId(docId);
+ _usedDictIds.add(dictId);
+ _totalNumberOfEntries++; // Count each valid document
+
+ // Track min/max values
+ Object value = dictionary.get(dictId);
+ updateMinMaxValue(value);
+ } else {
+ // Multi-value column: use getDictIdMV()
+ int[] dictIds = forwardIndex.getDictIdMV(docId);
+ _totalNumberOfEntries += dictIds.length; // Count all values in this
document
+ _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues,
dictIds.length);
+ for (int dictId : dictIds) {
+ _usedDictIds.add(dictId);
+
+ // Track min/max values
+ Object value = dictionary.get(dictId);
+ updateMinMaxValue(value);
+ }
+ }
+ }
+
+ _compactedCardinality = _usedDictIds.size();
+
+ // Create compacted unique values array with only used dictionary values
+ Object originalValues = dictionary.getSortedValues();
Review Comment:
since originalValues is sorted array, perhaps just iterate it and keep those
contained in _usedDictIds to get the _compactedUniqueValues array?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/CompactedDictEncodedColumnStatistics.java:
##########
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.converter.stats;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+
+
+/**
+ * Column statistics for dictionary columns with dictionary-encoded forward
indexes.
+ * Uses direct getDictId() calls for single-value columns and getDictIdMV()
calls for multi-value columns
+ * to find which dictionary entries are used by valid documents.
+ *
+ * This is used when:
+ * - Column has a dictionary (dataSource.getDictionary() != null)
+ * - Forward index is dictionary-encoded (forwardIndex.isDictionaryEncoded()
== true)
+ * - Commit-time compaction is enabled
+ */
+public class CompactedDictEncodedColumnStatistics extends
MutableColumnStatistics {
+ private final Set<Integer> _usedDictIds;
+ private final int _compactedCardinality;
+ private final DataSource _dataSource;
+ private final Object _compactedUniqueValues;
+ private int _maxNumberOfMultiValues = 1; // Track max multi-values for
buffer allocation
+ private int _totalNumberOfEntries = 0; // Total number of entries in the
column
+ private Object _minValue; // Track min value from valid documents
+ private Object _maxValue; // Track max value from valid documents
+
+ public CompactedDictEncodedColumnStatistics(DataSource dataSource, int[]
sortedDocIds,
+ ThreadSafeMutableRoaringBitmap validDocIds) {
+ super(dataSource, sortedDocIds);
+ _dataSource = dataSource;
+
+ String columnName =
dataSource.getDataSourceMetadata().getFieldSpec().getName();
+
+ // Find which dictionary IDs are actually used by valid documents
+ _usedDictIds = new HashSet<>();
+ MutableForwardIndex forwardIndex = (MutableForwardIndex)
dataSource.getForwardIndex();
+ Dictionary dictionary = dataSource.getDictionary();
+
+ // Iterate through valid document IDs
+ int[] validDocIdsArray = validDocIds.getMutableRoaringBitmap().toArray();
+ boolean isSingleValue = forwardIndex.isSingleValue();
+
+ for (int docId : validDocIdsArray) {
+ if (isSingleValue) {
+ // Single-value column: use getDictId()
+ int dictId = forwardIndex.getDictId(docId);
+ _usedDictIds.add(dictId);
+ _totalNumberOfEntries++; // Count each valid document
+
+ // Track min/max values
+ Object value = dictionary.get(dictId);
+ updateMinMaxValue(value);
+ } else {
+ // Multi-value column: use getDictIdMV()
+ int[] dictIds = forwardIndex.getDictIdMV(docId);
+ _totalNumberOfEntries += dictIds.length; // Count all values in this
document
+ _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues,
dictIds.length);
+ for (int dictId : dictIds) {
+ _usedDictIds.add(dictId);
+
+ // Track min/max values
+ Object value = dictionary.get(dictId);
+ updateMinMaxValue(value);
+ }
+ }
+ }
+
+ _compactedCardinality = _usedDictIds.size();
+
+ // Create compacted unique values array with only used dictionary values
+ Object originalValues = dictionary.getSortedValues();
+
+ // Extract the used values and sort them by value (not by dictionary ID)
+ List<ValueWithOriginalId<Comparable<Object>>> usedValuesWithIds = new
ArrayList<>();
+ for (Integer dictId : _usedDictIds) {
+ Comparable<Object> value = (Comparable<Object>) dictionary.get(dictId);
+ usedValuesWithIds.add(new ValueWithOriginalId<>(value, dictId));
+ }
+
+ // Sort by values to ensure the compacted array is value-sorted
+ usedValuesWithIds.sort(Comparator.comparing(a -> a._value));
+
+ // Create a compacted array containing only the used dictionary values in
sorted order by value
+ Class<?> componentType = originalValues.getClass().getComponentType();
+ Object compacted = Array.newInstance(componentType, _compactedCardinality);
+
+ for (int i = 0; i < _compactedCardinality; i++) {
+ ValueWithOriginalId<Comparable<Object>> entry = usedValuesWithIds.get(i);
+ Array.set(compacted, i, entry._value);
+ }
+ _compactedUniqueValues = compacted;
+ }
+
+ @Override
+ public int getCardinality() {
+ return _compactedCardinality;
+ }
+
+ @Override
+ public Object getUniqueValuesSet() {
+ return _compactedUniqueValues;
+ }
+
+ @Override
+ public int getMaxNumberOfMultiValues() {
+ return _maxNumberOfMultiValues;
+ }
+
+ @Override
+ public int getTotalNumberOfEntries() {
+ return _totalNumberOfEntries;
+ }
+
+ @Override
+ public Object getMinValue() {
+ return _minValue;
+ }
+
+ @Override
+ public Object getMaxValue() {
+ return _maxValue;
+ }
+
+ /**
+ * Wrapper class to hold a value with its original dictionary ID for
type-safe sorting.
+ */
+ class ValueWithOriginalId<T extends Comparable<? super T>> {
+ final T _value;
+ final int _originalId;
Review Comment:
_originalId is never used?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/CompactedRawIndexDictColumnStatistics.java:
##########
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.converter.stats;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+
+
+/**
+ * Column statistics for dictionary columns with raw (non-dictionary-encoded)
forward indexes.
+ * Reads raw values from forward index and maps them to dictionary IDs.
+ *
+ * This is used when:
+ * - Column has a dictionary (dataSource.getDictionary() != null)
+ * - Forward index is NOT dictionary-encoded
(forwardIndex.isDictionaryEncoded() == false)
+ * - Commit-time compaction is enabled
+ *
+ * Common scenarios:
+ * - Multi-value columns where forward index stores raw values but dictionary
exists for other operations
+ * - Variable-length string columns optimized for sequential access
+ * - Consuming segments where dictionary is built separately from forward index
+ */
+public class CompactedRawIndexDictColumnStatistics extends
MutableColumnStatistics {
+ private final Set<Integer> _usedDictIds;
+ private final int _compactedCardinality;
+ private final Object _compactedUniqueValues;
+
+ public CompactedRawIndexDictColumnStatistics(DataSource dataSource, int[]
sortedDocIds,
+ ThreadSafeMutableRoaringBitmap validDocIds) {
+ super(dataSource, sortedDocIds);
+
+ String columnName =
dataSource.getDataSourceMetadata().getFieldSpec().getName();
+ Dictionary dictionary = dataSource.getDictionary();
+ MutableForwardIndex forwardIndex = (MutableForwardIndex)
dataSource.getForwardIndex();
+
+ // Since forward index is not dictionary-encoded, we need to read raw
values and map them to dictionary IDs
+ _usedDictIds = new HashSet<>();
+ int[] validDocIdsArray = validDocIds.getMutableRoaringBitmap().toArray();
+
+ // Read raw values from valid documents and find corresponding dictionary
IDs
+ for (int docId : validDocIdsArray) {
+ Object rawValue = getRawValue(forwardIndex, docId);
+ if (rawValue != null) {
+ // Find the dictionary ID for this raw value using type-specific lookup
+ int dictId = getDictIdForValue(dictionary, rawValue,
forwardIndex.getStoredType());
+ if (dictId >= 0) {
+ _usedDictIds.add(dictId);
+ }
+ }
+ }
+
+ _compactedCardinality = _usedDictIds.size();
+
+ // Create compacted unique values array with type-safe sorting
+ Object originalValues = dictionary.getSortedValues();
+ Class<?> componentType = originalValues.getClass().getComponentType();
+ // The cast is safe because dictionary values are always Comparable
+ List<ValueWithOriginalId<Comparable<Object>>> usedValuesWithIds = new
ArrayList<>();
+ for (Integer dictId : _usedDictIds) {
+ Comparable<Object> value = (Comparable<Object>) dictionary.get(dictId);
+ usedValuesWithIds.add(new ValueWithOriginalId<>(value, dictId));
+ }
+ // Sort by values to ensure the compacted array is value-sorted
+ usedValuesWithIds.sort(Comparator.comparing(a -> a._value));
+ // Create a compacted array containing only the used dictionary values in
sorted order by value
+ Object compacted = Array.newInstance(componentType, _compactedCardinality);
+ for (int i = 0; i < _compactedCardinality; i++) {
+ Array.set(compacted, i, usedValuesWithIds.get(i)._value);
+ }
+ _compactedUniqueValues = compacted;
+ }
+
+ private Object getRawValue(MutableForwardIndex forwardIndex, int docId) {
+ switch (forwardIndex.getStoredType()) {
+ case INT:
+ return forwardIndex.getInt(docId);
+ case LONG:
+ return forwardIndex.getLong(docId);
+ case FLOAT:
+ return forwardIndex.getFloat(docId);
+ case DOUBLE:
+ return forwardIndex.getDouble(docId);
+ case STRING:
+ return forwardIndex.getString(docId);
+ case BYTES:
+ return forwardIndex.getBytes(docId);
+ case BIG_DECIMAL:
+ return forwardIndex.getBigDecimal(docId);
+ default:
+ throw new IllegalStateException("Unsupported data type: " +
forwardIndex.getStoredType());
+ }
+ }
+
+ private int getDictIdForValue(Dictionary dictionary, Object value, DataType
dataType) {
+ switch (dataType) {
+ case INT:
+ return dictionary.indexOf((Integer) value);
+ case LONG:
+ return dictionary.indexOf((Long) value);
+ case FLOAT:
+ return dictionary.indexOf((Float) value);
+ case DOUBLE:
+ return dictionary.indexOf((Double) value);
+ case STRING:
+ return dictionary.indexOf((String) value);
+ case BYTES:
+ return dictionary.indexOf(new ByteArray((byte[]) value));
+ case BIG_DECIMAL:
+ return dictionary.indexOf((BigDecimal) value);
+ default:
+ throw new IllegalStateException("Unsupported data type: " + dataType);
+ }
+ }
+
+ @Override
+ public int getCardinality() {
+ return _compactedCardinality;
+ }
+
+ @Override
+ public Object getUniqueValuesSet() {
+ return _compactedUniqueValues;
+ }
+
+ /**
+ * Wrapper class to hold a value with its original dictionary ID for
type-safe sorting.
+ */
+ class ValueWithOriginalId<T extends Comparable<? super T>> {
+ final T _value;
+ final int _originalId;
Review Comment:
not used?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java:
##########
@@ -57,28 +67,118 @@ public CompactedPinotSegmentRecordReader(RoaringBitmap
validDocIds,
_deleteRecordColumn = deleteRecordColumn;
}
+ public CompactedPinotSegmentRecordReader(ThreadSafeMutableRoaringBitmap
validDocIds) {
+ this(validDocIds, null);
+ }
+
+ public CompactedPinotSegmentRecordReader(ThreadSafeMutableRoaringBitmap
validDocIds,
+ @Nullable String deleteRecordColumn) {
+ Preconditions.checkNotNull(validDocIds, "Valid document IDs cannot be
null");
+ _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+ _validDocIdsBitmap =
validDocIds.getMutableRoaringBitmap().toRoaringBitmap();
+ _validDocIdsIterator = _validDocIdsBitmap.getIntIterator();
+ _deleteRecordColumn = deleteRecordColumn;
+ }
+
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
// lazy init the record reader
_pinotSegmentRecordReader.init(dataFile, null, null);
+ prepareSortedValidDocIds();
+ }
+
+ /**
+ * Initializes the record reader from a mutable segment with valid document
ids and optional sorted document ids.
+ *
+ * @param mutableSegment Mutable segment
+ * @param sortedDocIds Array of sorted document ids (can be null)
+ */
+ public void init(MutableSegment mutableSegment, @Nullable int[]
sortedDocIds) {
+ _pinotSegmentRecordReader.init(mutableSegment, sortedDocIds);
+ prepareSortedValidDocIds();
+ }
+
+ /**
+ * Prepares the sorted valid document IDs array based on whether sorted
document IDs are available.
+ * If sorted document IDs are available, creates an array of valid document
IDs in sorted order.
+ * If not available, falls back to bitmap iteration order.
+ */
+ private void prepareSortedValidDocIds() {
+ int[] sortedDocIds = _pinotSegmentRecordReader.getSortedDocIds();
+ if (sortedDocIds != null) {
+ // Create array of valid document IDs in sorted order
+ List<Integer> sortedValidDocIdsList = new ArrayList<>();
+ for (int docId : sortedDocIds) {
+ if (_validDocIdsBitmap.contains(docId)) {
+ sortedValidDocIdsList.add(docId);
+ }
+ }
+ _sortedValidDocIds =
sortedValidDocIdsList.stream().mapToInt(Integer::intValue).toArray();
+ } else {
+ // No sorted order available, use bitmap iteration order (existing
behavior)
+ _sortedValidDocIds = null;
+ }
+ _currentDocIndex = 0;
Review Comment:
looks like this index is not updated by this method, so why reset it?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java:
##########
@@ -57,28 +67,118 @@ public CompactedPinotSegmentRecordReader(RoaringBitmap
validDocIds,
_deleteRecordColumn = deleteRecordColumn;
}
+ public CompactedPinotSegmentRecordReader(ThreadSafeMutableRoaringBitmap
validDocIds) {
+ this(validDocIds, null);
+ }
+
+ public CompactedPinotSegmentRecordReader(ThreadSafeMutableRoaringBitmap
validDocIds,
+ @Nullable String deleteRecordColumn) {
+ Preconditions.checkNotNull(validDocIds, "Valid document IDs cannot be
null");
+ _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+ _validDocIdsBitmap =
validDocIds.getMutableRoaringBitmap().toRoaringBitmap();
+ _validDocIdsIterator = _validDocIdsBitmap.getIntIterator();
+ _deleteRecordColumn = deleteRecordColumn;
+ }
+
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
// lazy init the record reader
_pinotSegmentRecordReader.init(dataFile, null, null);
+ prepareSortedValidDocIds();
+ }
+
+ /**
+ * Initializes the record reader from a mutable segment with valid document
ids and optional sorted document ids.
+ *
+ * @param mutableSegment Mutable segment
+ * @param sortedDocIds Array of sorted document ids (can be null)
+ */
+ public void init(MutableSegment mutableSegment, @Nullable int[]
sortedDocIds) {
+ _pinotSegmentRecordReader.init(mutableSegment, sortedDocIds);
+ prepareSortedValidDocIds();
+ }
+
+ /**
+ * Prepares the sorted valid document IDs array based on whether sorted
document IDs are available.
+ * If sorted document IDs are available, creates an array of valid document
IDs in sorted order.
+ * If not available, falls back to bitmap iteration order.
+ */
+ private void prepareSortedValidDocIds() {
+ int[] sortedDocIds = _pinotSegmentRecordReader.getSortedDocIds();
Review Comment:
looks like we can create _sortedValidDocIds array directly with
_validDocIdsBitmap.size()? to avoid creating a temp ArrayList
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]