This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 547115ef925 Add consuming segment check for retention check (#17117)
547115ef925 is described below

commit 547115ef9253b57d1e087feaeedb94b7c50c3468
Author: tarun11Mavani <[email protected]>
AuthorDate: Thu Nov 13 20:37:17 2025 +0530

    Add consuming segment check for retention check (#17117)
---
 .../retention/strategy/TimeRetentionStrategy.java  |  7 +++
 .../helix/core/retention/RetentionManagerTest.java |  1 +
 .../strategy/TimeRetentionStrategyTest.java        | 71 ++++++++++++++++++++++
 3 files changed, 79 insertions(+)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index cda94dbb5d5..d7c3acb4640 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -39,6 +39,13 @@ public class TimeRetentionStrategy implements 
RetentionStrategy {
 
   @Override
   public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata 
segmentZKMetadata) {
+
+    // For realtime tables, only completed segments(DONE or UPLOADED) are 
eligible for purging.
+    //For offline tables, status defaults to UPLOADED which is completed, so 
they proceed to normal retention
+    if (!segmentZKMetadata.getStatus().isCompleted()) {
+      return false; // Incomplete segments don't have final end time and 
should not be purged
+    }
+
     return isPurgeable(tableNameWithType, segmentZKMetadata.getSegmentName(), 
segmentZKMetadata.getEndTimeMs());
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index ebe829231a0..782f50b0727 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -833,6 +833,7 @@ public class RetentionManagerTest {
     when(segmentZKMetadata.getCreationTime()).thenReturn(creationTime);
     
when(segmentZKMetadata.getStartTimeMs()).thenReturn(timeUnit.toMillis(startTime));
     
when(segmentZKMetadata.getEndTimeMs()).thenReturn(timeUnit.toMillis(endTime));
+    
when(segmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE);
     return segmentZKMetadata;
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
index 8ded0a0f1aa..70fa4900e20 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
@@ -20,8 +20,10 @@ package 
org.apache.pinot.controller.helix.core.retention.strategy;
 
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
@@ -64,4 +66,73 @@ public class TimeRetentionStrategyTest {
     segmentZKMetadata.setEndTime(today + (365 * 200));
     assertFalse(retentionStrategy.isPurgeable(tableNameWithType, 
segmentZKMetadata));
   }
+
+  @Test
+  public void testIncompleteSegmentRetention() {
+    String tableNameWithType = "myTable_REALTIME";
+    TimeRetentionStrategy retentionStrategy = new 
TimeRetentionStrategy(TimeUnit.DAYS, 30L);
+
+    // Test IN_PROGRESS segment (consuming segment)
+    SegmentZKMetadata consumingSegmentMetadata = new 
SegmentZKMetadata("myConsumingSegment");
+    consumingSegmentMetadata.setStatus(Status.IN_PROGRESS);
+
+    // Consuming segments have end time of -1, which would normally trigger 
the warning log
+    // But with our fix, consuming segments should not be purgeable regardless 
of end time
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, 
consumingSegmentMetadata));
+
+    // Test COMMITTING segment (pauseless ingestion)
+    SegmentZKMetadata committingSegmentMetadata = new 
SegmentZKMetadata("myCommittingSegment");
+    committingSegmentMetadata.setStatus(Status.COMMITTING);
+
+    // Committing segments also have end time of -1 until they are fully 
committed
+    // They should not be purgeable either
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, 
committingSegmentMetadata));
+
+    // Test with completed statuses to ensure they still follow normal 
retention logic
+    SegmentZKMetadata doneSegmentMetadata = new 
SegmentZKMetadata("myDoneSegment");
+    doneSegmentMetadata.setStatus(Status.DONE);
+    doneSegmentMetadata.setTimeUnit(TimeUnit.DAYS);
+
+    // Set end time to two months ago (should be purgeable for completed 
segments)
+    long today = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+    doneSegmentMetadata.setEndTime(today - 60);
+    assertTrue(retentionStrategy.isPurgeable(tableNameWithType, 
doneSegmentMetadata));
+
+    // Test UPLOADED status as well
+    SegmentZKMetadata uploadedSegmentMetadata = new 
SegmentZKMetadata("myUploadedSegment");
+    uploadedSegmentMetadata.setStatus(Status.UPLOADED);
+    uploadedSegmentMetadata.setTimeUnit(TimeUnit.DAYS);
+    uploadedSegmentMetadata.setEndTime(today - 60);
+    assertTrue(retentionStrategy.isPurgeable(tableNameWithType, 
uploadedSegmentMetadata));
+  }
+
+  @Test
+  public void testOfflineTableRetention() {
+    String tableNameWithType = "myTable_OFFLINE";
+    TimeRetentionStrategy retentionStrategy = new 
TimeRetentionStrategy(TimeUnit.DAYS, 30L);
+
+    // Test offline segment - these don't have status field set, so 
getStatus() returns default UPLOADED
+    // But we need to ensure they follow normal retention logic for offline 
tables
+    SegmentZKMetadata offlineSegmentMetadata = new 
SegmentZKMetadata("myOfflineSegment");
+    // Note: We don't set status for offline segments - it defaults to UPLOADED
+    offlineSegmentMetadata.setTimeUnit(TimeUnit.DAYS);
+
+    // Verify that offline segments have default status of UPLOADED
+    assertEquals(Status.UPLOADED, offlineSegmentMetadata.getStatus());
+    assertTrue(offlineSegmentMetadata.getStatus().isCompleted());
+
+    long today = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+    // Set end time to two weeks ago (should not be purgeable - within 
retention period)
+    offlineSegmentMetadata.setEndTime(today - 14);
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, 
offlineSegmentMetadata));
+
+    // Set end time to two months ago (should be purgeable - beyond retention 
period)
+    offlineSegmentMetadata.setEndTime(today - 60);
+    assertTrue(retentionStrategy.isPurgeable(tableNameWithType, 
offlineSegmentMetadata));
+
+    // Test offline segment with invalid end time (should not be purgeable)
+    offlineSegmentMetadata.setEndTime(-1);
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, 
offlineSegmentMetadata));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to