tarun11Mavani commented on code in PR #16344:
URL: https://github.com/apache/pinot/pull/16344#discussion_r2284727860


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java:
##########
@@ -57,28 +67,118 @@ public CompactedPinotSegmentRecordReader(RoaringBitmap 
validDocIds,
     _deleteRecordColumn = deleteRecordColumn;
   }
 
+  public CompactedPinotSegmentRecordReader(ThreadSafeMutableRoaringBitmap 
validDocIds) {
+    this(validDocIds, null);
+  }
+
+  public CompactedPinotSegmentRecordReader(ThreadSafeMutableRoaringBitmap 
validDocIds,
+      @Nullable String deleteRecordColumn) {
+    Preconditions.checkNotNull(validDocIds, "Valid document IDs cannot be 
null");
+    _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+    _validDocIdsBitmap = 
validDocIds.getMutableRoaringBitmap().toRoaringBitmap();
+    _validDocIdsIterator = _validDocIdsBitmap.getIntIterator();
+    _deleteRecordColumn = deleteRecordColumn;
+  }
+
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
     // lazy init the record reader
     _pinotSegmentRecordReader.init(dataFile, null, null);
+    prepareSortedValidDocIds();
+  }
+
+  /**
+   * Initializes the record reader from a mutable segment with valid document 
ids and optional sorted document ids.
+   *
+   * @param mutableSegment Mutable segment
+   * @param sortedDocIds Array of sorted document ids (can be null)
+   */
+  public void init(MutableSegment mutableSegment, @Nullable int[] 
sortedDocIds) {
+    _pinotSegmentRecordReader.init(mutableSegment, sortedDocIds);
+    prepareSortedValidDocIds();
+  }
+
+  /**
+   * Prepares the sorted valid document IDs array based on whether sorted 
document IDs are available.
+   * If sorted document IDs are available, creates an array of valid document 
IDs in sorted order.
+   * If not available, falls back to bitmap iteration order.
+   */
+  private void prepareSortedValidDocIds() {
+    int[] sortedDocIds = _pinotSegmentRecordReader.getSortedDocIds();

Review Comment:
   Done.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -96,23 +97,61 @@ public void build(@Nullable SegmentVersion segmentVersion, 
@Nullable ServerMetri
     _realtimeSegmentImpl.commit();
 
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {
-      String sortedColumn = null;
-      List<String> columnSortOrder = genConfig.getColumnSortOrder();
-      if (CollectionUtils.isNotEmpty(columnSortOrder)) {
-        sortedColumn = columnSortOrder.get(0);
+
+    // Check if commit-time compaction is enabled for upsert tables
+    boolean useCompactedReader = 
TableConfigUtils.isCommitTimeCompactionEnabled(_tableConfig);
+
+    String sortedColumn = null;
+    List<String> columnSortOrder = genConfig.getColumnSortOrder();
+    if (columnSortOrder != null && !columnSortOrder.isEmpty()) {
+      sortedColumn = columnSortOrder.get(0);
+    }
+    int[] sortedDocIds =
+        sortedColumn != null ? 
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn) 
: null;
+
+    if (useCompactedReader) {
+      // Collect metrics for commit-time compaction
+      long compactionStartTime = System.currentTimeMillis();
+      int preCompactionRowCount = _realtimeSegmentImpl.getNumDocsIndexed();
+      // Track that commit-time compaction is enabled for this segment
+      if (serverMetrics != null) {
+        serverMetrics.addMeteredTableValue(_tableName, 
ServerMeter.COMMIT_TIME_COMPACTION_ENABLED_SEGMENTS, 1L);
+        serverMetrics.addMeteredTableValue(_tableName, 
ServerMeter.COMMIT_TIME_COMPACTION_ROWS_PRE_COMPACTION,
+            preCompactionRowCount);
+      }
+
+      // Use CompactedPinotSegmentRecordReader to remove obsolete records
+      try (CompactedPinotSegmentRecordReader recordReader = new 
CompactedPinotSegmentRecordReader(
+          _realtimeSegmentImpl.getValidDocIds(), 
_realtimeSegmentImpl.getDeleteRecordColumn())) {
+        recordReader.init(_realtimeSegmentImpl, sortedDocIds);
+        RealtimeSegmentSegmentCreationDataSource dataSource =
+            new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, 
recordReader, sortedDocIds);
+        driver.init(genConfig, dataSource, 
TransformPipeline.getPassThroughPipeline(_tableName)); // initializes reader
+
+        if (!_enableColumnMajor) {
+          driver.build();
+        } else {
+          driver.buildByColumn(_realtimeSegmentImpl);
+        }
       }
-      int[] sortedDocIds =
-          sortedColumn != null ? 
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn) 
: null;
-      recordReader.init(_realtimeSegmentImpl, sortedDocIds);
-      RealtimeSegmentSegmentCreationDataSource dataSource =
-          new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, 
recordReader);
-      driver.init(genConfig, dataSource, 
TransformPipeline.getPassThroughPipeline(_tableName)); // initializes reader
-
-      if (!_enableColumnMajor) {
-        driver.build();
-      } else {
-        driver.buildByColumn(_realtimeSegmentImpl);
+
+      // Collect and publish post-compaction metrics
+      if (serverMetrics != null) {
+        publishCompactionMetrics(serverMetrics, preCompactionRowCount, driver, 
compactionStartTime);
+      }
+    } else {
+      // Use regular PinotSegmentRecordReader (existing behavior)
+      try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {

Review Comment:
   Moved common logic to buildSegmentWithReader and reused. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to