This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 547115ef925 Add consuming segment check for retention check (#17117)
547115ef925 is described below
commit 547115ef9253b57d1e087feaeedb94b7c50c3468
Author: tarun11Mavani <[email protected]>
AuthorDate: Thu Nov 13 20:37:17 2025 +0530
Add consuming segment check for retention check (#17117)
---
.../retention/strategy/TimeRetentionStrategy.java | 7 +++
.../helix/core/retention/RetentionManagerTest.java | 1 +
.../strategy/TimeRetentionStrategyTest.java | 71 ++++++++++++++++++++++
3 files changed, 79 insertions(+)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index cda94dbb5d5..d7c3acb4640 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -39,6 +39,13 @@ public class TimeRetentionStrategy implements
RetentionStrategy {
@Override
public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata
segmentZKMetadata) {
+
+ // For realtime tables, only completed segments(DONE or UPLOADED) are
eligible for purging.
+ //For offline tables, status defaults to UPLOADED which is completed, so
they proceed to normal retention
+ if (!segmentZKMetadata.getStatus().isCompleted()) {
+ return false; // Incomplete segments don't have final end time and
should not be purged
+ }
+
return isPurgeable(tableNameWithType, segmentZKMetadata.getSegmentName(),
segmentZKMetadata.getEndTimeMs());
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index ebe829231a0..782f50b0727 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -833,6 +833,7 @@ public class RetentionManagerTest {
when(segmentZKMetadata.getCreationTime()).thenReturn(creationTime);
when(segmentZKMetadata.getStartTimeMs()).thenReturn(timeUnit.toMillis(startTime));
when(segmentZKMetadata.getEndTimeMs()).thenReturn(timeUnit.toMillis(endTime));
+
when(segmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE);
return segmentZKMetadata;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
index 8ded0a0f1aa..70fa4900e20 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
@@ -20,8 +20,10 @@ package
org.apache.pinot.controller.helix.core.retention.strategy;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -64,4 +66,73 @@ public class TimeRetentionStrategyTest {
segmentZKMetadata.setEndTime(today + (365 * 200));
assertFalse(retentionStrategy.isPurgeable(tableNameWithType,
segmentZKMetadata));
}
+
+ @Test
+ public void testIncompleteSegmentRetention() {
+ String tableNameWithType = "myTable_REALTIME";
+ TimeRetentionStrategy retentionStrategy = new
TimeRetentionStrategy(TimeUnit.DAYS, 30L);
+
+ // Test IN_PROGRESS segment (consuming segment)
+ SegmentZKMetadata consumingSegmentMetadata = new
SegmentZKMetadata("myConsumingSegment");
+ consumingSegmentMetadata.setStatus(Status.IN_PROGRESS);
+
+ // Consuming segments have end time of -1, which would normally trigger
the warning log
+ // But with our fix, consuming segments should not be purgeable regardless
of end time
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType,
consumingSegmentMetadata));
+
+ // Test COMMITTING segment (pauseless ingestion)
+ SegmentZKMetadata committingSegmentMetadata = new
SegmentZKMetadata("myCommittingSegment");
+ committingSegmentMetadata.setStatus(Status.COMMITTING);
+
+ // Committing segments also have end time of -1 until they are fully
committed
+ // They should not be purgeable either
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType,
committingSegmentMetadata));
+
+ // Test with completed statuses to ensure they still follow normal
retention logic
+ SegmentZKMetadata doneSegmentMetadata = new
SegmentZKMetadata("myDoneSegment");
+ doneSegmentMetadata.setStatus(Status.DONE);
+ doneSegmentMetadata.setTimeUnit(TimeUnit.DAYS);
+
+ // Set end time to two months ago (should be purgeable for completed
segments)
+ long today = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+ doneSegmentMetadata.setEndTime(today - 60);
+ assertTrue(retentionStrategy.isPurgeable(tableNameWithType,
doneSegmentMetadata));
+
+ // Test UPLOADED status as well
+ SegmentZKMetadata uploadedSegmentMetadata = new
SegmentZKMetadata("myUploadedSegment");
+ uploadedSegmentMetadata.setStatus(Status.UPLOADED);
+ uploadedSegmentMetadata.setTimeUnit(TimeUnit.DAYS);
+ uploadedSegmentMetadata.setEndTime(today - 60);
+ assertTrue(retentionStrategy.isPurgeable(tableNameWithType,
uploadedSegmentMetadata));
+ }
+
+ @Test
+ public void testOfflineTableRetention() {
+ String tableNameWithType = "myTable_OFFLINE";
+ TimeRetentionStrategy retentionStrategy = new
TimeRetentionStrategy(TimeUnit.DAYS, 30L);
+
+ // Test offline segment - these don't have status field set, so
getStatus() returns default UPLOADED
+ // But we need to ensure they follow normal retention logic for offline
tables
+ SegmentZKMetadata offlineSegmentMetadata = new
SegmentZKMetadata("myOfflineSegment");
+ // Note: We don't set status for offline segments - it defaults to UPLOADED
+ offlineSegmentMetadata.setTimeUnit(TimeUnit.DAYS);
+
+ // Verify that offline segments have default status of UPLOADED
+ assertEquals(Status.UPLOADED, offlineSegmentMetadata.getStatus());
+ assertTrue(offlineSegmentMetadata.getStatus().isCompleted());
+
+ long today = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+ // Set end time to two weeks ago (should not be purgeable - within
retention period)
+ offlineSegmentMetadata.setEndTime(today - 14);
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType,
offlineSegmentMetadata));
+
+ // Set end time to two months ago (should be purgeable - beyond retention
period)
+ offlineSegmentMetadata.setEndTime(today - 60);
+ assertTrue(retentionStrategy.isPurgeable(tableNameWithType,
offlineSegmentMetadata));
+
+ // Test offline segment with invalid end time (should not be purgeable)
+ offlineSegmentMetadata.setEndTime(-1);
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType,
offlineSegmentMetadata));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]