rohityadav1993 commented on code in PR #16344:
URL: https://github.com/apache/pinot/pull/16344#discussion_r2283156679


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java:
##########
@@ -38,28 +36,59 @@
 public class RealtimeSegmentStatsContainer implements 
SegmentPreIndexStatsContainer {
   private final MutableSegment _mutableSegment;
   private final Map<String, ColumnStatistics> _columnStatisticsMap = new 
HashMap<>();
+  private final int _totalDocCount;
 
-  public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, 
@Nullable int[] sortedDocIds,
+  public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, int[] 
sortedDocIds,

Review Comment:
   sortedDocIds can be nullable based on usage in 
RealtimeSegmentSegmentCreationDataSource



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java:
##########
@@ -38,28 +36,59 @@
 public class RealtimeSegmentStatsContainer implements 
SegmentPreIndexStatsContainer {
   private final MutableSegment _mutableSegment;
   private final Map<String, ColumnStatistics> _columnStatisticsMap = new 
HashMap<>();
+  private final int _totalDocCount;
 
-  public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, 
@Nullable int[] sortedDocIds,
+  public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, int[] 
sortedDocIds,
       StatsCollectorConfig statsCollectorConfig) {
+    this(mutableSegment, sortedDocIds, statsCollectorConfig, null);
+  }
+
+  public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, int[] 
sortedDocIds,
+      StatsCollectorConfig statsCollectorConfig, RecordReader recordReader) {
     _mutableSegment = mutableSegment;
 
+    // Determine if we're using compacted reader
+    boolean isUsingCompactedReader = recordReader instanceof 
CompactedPinotSegmentRecordReader;
+
+    // Determine the correct total document count based on whether compaction 
is being used
+    if (isUsingCompactedReader) {
+      // When using CompactedPinotSegmentRecordReader, use the valid document 
count
+      if (mutableSegment.getValidDocIds() != null) {
+        _totalDocCount = 
mutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+      } else {
+        _totalDocCount = mutableSegment.getNumDocsIndexed();
+      }
+    } else {
+      // Use the original total document count for non-compacted readers
+      _totalDocCount = mutableSegment.getNumDocsIndexed();
+    }
+
     // Create all column statistics
+    // Determine compaction mode once for all columns
+    boolean useCompactedStatistics = isUsingCompactedReader && 
mutableSegment.getValidDocIds() != null;
+    ThreadSafeMutableRoaringBitmap validDocIds = useCompactedStatistics ? 
mutableSegment.getValidDocIds() : null;
+
     for (String columnName : mutableSegment.getPhysicalColumnNames()) {
       DataSource dataSource = mutableSegment.getDataSource(columnName);
-      if (dataSource instanceof MutableMapDataSource) {
-        ForwardIndexReader reader = dataSource.getForwardIndex();
-        MapColumnPreIndexStatsCollector mapColumnPreIndexStatsCollector =

Review Comment:
   I think Map column type related stats collector logic is getting removed.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -96,23 +97,61 @@ public void build(@Nullable SegmentVersion segmentVersion, 
@Nullable ServerMetri
     _realtimeSegmentImpl.commit();
 
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {
-      String sortedColumn = null;
-      List<String> columnSortOrder = genConfig.getColumnSortOrder();
-      if (CollectionUtils.isNotEmpty(columnSortOrder)) {
-        sortedColumn = columnSortOrder.get(0);
+
+    // Check if commit-time compaction is enabled for upsert tables
+    boolean useCompactedReader = 
TableConfigUtils.isCommitTimeCompactionEnabled(_tableConfig);
+
+    String sortedColumn = null;
+    List<String> columnSortOrder = genConfig.getColumnSortOrder();
+    if (columnSortOrder != null && !columnSortOrder.isEmpty()) {
+      sortedColumn = columnSortOrder.get(0);
+    }
+    int[] sortedDocIds =
+        sortedColumn != null ? 
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn) 
: null;
+
+    if (useCompactedReader) {

Review Comment:
   there is potential for reducing code repitition, recordReader can be 
initialised based on useCompactedReader but most of the other logic is reusable 
for building segment



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -125,6 +164,34 @@ public void build(@Nullable SegmentVersion segmentVersion, 
@Nullable ServerMetri
     }
   }
 
+  /**
+   * Publishes detailed commit-time compaction metrics
+   */
+  private void publishCompactionMetrics(ServerMetrics serverMetrics, int 
preCompactionRowCount,
+      SegmentIndexCreationDriverImpl driver, long compactionStartTime) {
+    try {
+      int postCompactionRowCount = driver.getSegmentStats().getTotalDocCount();
+      long compactionProcessingTime = System.currentTimeMillis() - 
compactionStartTime;
+      int rowsRemoved = preCompactionRowCount - postCompactionRowCount;
+
+      // Publish basic row count metrics
+      serverMetrics.addMeteredTableValue(_tableName, 
ServerMeter.COMMIT_TIME_COMPACTION_ROWS_POST_COMPACTION,
+          postCompactionRowCount);
+      serverMetrics.addMeteredTableValue(_tableName, 
ServerMeter.COMMIT_TIME_COMPACTION_ROWS_REMOVED, rowsRemoved);
+      serverMetrics.addMeteredTableValue(_tableName, 
ServerMeter.COMMIT_TIME_COMPACTION_BUILD_TIME_MS,
+          compactionProcessingTime);
+
+      // Calculate and publish compaction ratio percentage (only if we had 
rows to compact)
+      if (preCompactionRowCount > 0) {
+        double compactionRatioPercent = (double) rowsRemoved / 
preCompactionRowCount * 100.0;
+        serverMetrics.setOrUpdateTableGauge(_tableName, 
ServerGauge.COMMIT_TIME_COMPACTION_RATIO_PERCENT,
+            (long) compactionRatioPercent);
+      }
+    } catch (Exception e) {
+      //no-op.

Review Comment:
   can add logs for any debugging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to