This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 16efb8021be Honor replacedSegmentsRetentionPeriod for all table types 
(#18583)
16efb8021be is described below

commit 16efb8021be5570a5fa898bc16f7d488fc5ada7b
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu Jun 4 02:39:05 2026 +0530

    Honor replacedSegmentsRetentionPeriod for all table types (#18583)
    
    * Enable replacedSegmentsRetentionPeriod for all table types
    
    * Configure replacedSegmentsRetentionPeriod=0s in tests that asserted 
immediate APPEND-table cleanup
    
    The replaced-segments retention window now applies uniformly to all batch 
ingestion types,
    so tests that previously observed immediate cleanup on APPEND tables must 
explicitly opt
    into the immediate-cleanup behavior via the table config.
    
    * Add test that validates replacedSegmentsRetentionPeriod gates lineage 
cleanup
    
    Configures a separate OFFLINE table with a 2-second window. First retention 
pass runs
    inside the window and must keep the replaced segments; the second pass, run 
after the
    window elapses, must delete them.
    
    * Resolve unconfigured replacedSegmentsRetentionPeriod default per 
ingestion type
    
    When replacedSegmentsRetentionPeriod is set on a table it continues to 
apply to
    every table type. When it is unset, the default is now resolved per 
ingestion
    type: 24 hours for REFRESH tables (their replaced segments back a potential
    lineage rollback) and 4 hours for other table types (only a broker-catchup
    window is needed before the replaced source segments are dropped).
    
    Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 .../helix/core/lineage/DefaultLineageManager.java  | 37 +++++-----
 .../core/lineage/DefaultLineageManagerTest.java    | 84 ++++++++++++++++++++--
 .../LineageDeleteInterleavingIntegrationTest.java  | 11 +--
 .../core/retention/SegmentLineageCleanupTest.java  | 64 ++++++++++++++++-
 4 files changed, 168 insertions(+), 28 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
index fa83fb9ab85..03e098dad33 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
@@ -39,7 +39,12 @@ import org.slf4j.LoggerFactory;
 
 public class DefaultLineageManager implements LineageManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultLineageManager.class);
-  private static final long REPLACED_SEGMENTS_RETENTION_IN_MILLIS = 
TimeUnit.DAYS.toMillis(1L); // 1 day
+  // Default grace window before replaced (segmentsFrom) segments are dropped, 
used when the table does not
+  // configure replacedSegmentsRetentionPeriod. REFRESH tables keep a longer 
window because their replaced
+  // segments are the only copy of the previous snapshot and may be needed for 
a lineage rollback; other
+  // (e.g. APPEND) tables only need enough of a window for brokers to catch up 
on the latest lineage/IS/EV.
+  private static final long REFRESH_REPLACED_SEGMENTS_RETENTION_IN_MILLIS = 
TimeUnit.HOURS.toMillis(24L); // 24 hrs
+  private static final long DEFAULT_REPLACED_SEGMENTS_RETENTION_IN_MILLIS = 
TimeUnit.HOURS.toMillis(4L); // 4 hrs
   private static final long LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS = 
TimeUnit.DAYS.toMillis(1L); // 1 day
 
   protected ControllerConf _controllerConf;
@@ -78,9 +83,15 @@ public class DefaultLineageManager implements LineageManager 
{
     long lineageCleanupRetentionMs = getRetentionMsFromConfig(
         
tableConfig.getValidationConfig().getLineageEntryCleanupRetentionPeriod(),
         LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS, tableNameWithType, 
"lineageEntryCleanupRetentionPeriod");
+    // When replacedSegmentsRetentionPeriod is configured it is honored as-is 
for every table type. When it is
+    // absent we fall back to a longer default for REFRESH tables (their 
replaced segments back a potential
+    // lineage rollback) and a shorter default for other table types (only a 
broker-catchup window is needed).
+    String batchSegmentIngestionType = 
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
+    long defaultReplacedSegmentsRetentionMs = 
batchSegmentIngestionType.equalsIgnoreCase("REFRESH")
+        ? REFRESH_REPLACED_SEGMENTS_RETENTION_IN_MILLIS : 
DEFAULT_REPLACED_SEGMENTS_RETENTION_IN_MILLIS;
     long replacedSegmentsRetentionMs = getRetentionMsFromConfig(
         tableConfig.getValidationConfig().getReplacedSegmentsRetentionPeriod(),
-        REPLACED_SEGMENTS_RETENTION_IN_MILLIS, tableNameWithType, 
"replacedSegmentsRetentionPeriod");
+        defaultReplacedSegmentsRetentionMs, tableNameWithType, 
"replacedSegmentsRetentionPeriod");
     Set<String> segmentsForTable = new HashSet<>(allSegments);
     Iterator<LineageEntry> lineageEntryIterator = 
lineage.getLineageEntries().values().iterator();
     while (lineageEntryIterator.hasNext()) {
@@ -94,7 +105,7 @@ public class DefaultLineageManager implements LineageManager 
{
         } else {
           // If the lineage state is 'COMPLETED' and we already preserved the 
original segments for the required
           // retention, it is safe to delete all segments from 'segmentsFrom'
-          if (shouldDeleteReplacedSegments(tableConfig, lineageEntry, 
replacedSegmentsRetentionMs)) {
+          if (shouldDeleteReplacedSegments(lineageEntry, 
replacedSegmentsRetentionMs)) {
             segmentsToDelete.addAll(sourceSegments);
           }
         }
@@ -122,24 +133,18 @@ public class DefaultLineageManager implements 
LineageManager {
   /**
    * Helper function to decide whether we should delete segmentsFrom (replaced 
segments) given a lineage entry.
    *
-   * The replaced segments are safe to delete if either:
-   * 1) The table is not "REFRESH" (e.g. "APPEND"), in which case they are 
deleted immediately, or
-   * 2) The lineage entry has been in "COMPLETED" state for longer than {@code 
replacedSegmentsRetentionMs}
-   *    (configurable via {@code replacedSegmentsRetentionPeriod} in table 
config, defaulting to 1 day).
+   * The replaced segments are safe to delete once the lineage entry has been 
in "COMPLETED" state for longer
+   * than {@code replacedSegmentsRetentionMs}. The retention applies uniformly 
to all batch ingestion types —
+   * any replacement protocol (REFRESH-table snapshot replace, APPEND-table 
minion-driven replace,
+   * segment-group merge) gets the same configurable grace window before its 
replaced segments are dropped.
+   * The window is configurable via {@code replacedSegmentsRetentionPeriod} in 
table config; when it is unset
+   * the default is resolved per ingestion type by the caller (longer for 
REFRESH, shorter for others).
    *
-   * @param tableConfig a table config
    * @param lineageEntry lineage entry
    * @param replacedSegmentsRetentionMs configured retention in ms for 
replaced segments
    * @return True if we can safely delete the replaced segments. False 
otherwise.
    */
-  private boolean shouldDeleteReplacedSegments(TableConfig tableConfig, 
LineageEntry lineageEntry,
-      long replacedSegmentsRetentionMs) {
-    // TODO: Currently, we preserve the replaced segments for REFRESH tables 
only. Once we support
-    // data rollback for APPEND tables, we should remove this check.
-    String batchSegmentIngestionType = 
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
-    if (!batchSegmentIngestionType.equalsIgnoreCase("REFRESH")) {
-      return true;
-    }
+  private boolean shouldDeleteReplacedSegments(LineageEntry lineageEntry, long 
replacedSegmentsRetentionMs) {
     // Strict < means a 0ms retention won't delete on the exact same 
millisecond; this is intentional to
     // avoid edge-case races and is consistent with the existing behavior for 
non-zero retention values.
     return lineageEntry.getTimestamp() < (System.currentTimeMillis() - 
replacedSegmentsRetentionMs);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManagerTest.java
index f803a06a08a..5b875cbc7e8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManagerTest.java
@@ -159,22 +159,96 @@ public class DefaultLineageManagerTest {
   }
 
   @Test
-  public void testAppendTableAlwaysDeletesRegardlessOfRetentionConfig() {
-    // APPEND table: source segments are always eligible for deletion 
regardless of retention setting
+  public void testAppendTableHonorsReplacedSegmentsRetentionPeriod() {
+    // APPEND table with a non-zero replacedSegmentsRetentionPeriod must defer 
deletion until the
+    // retention window elapses — same semantics as REFRESH. This pins the 
symmetric behavior after
+    // the REFRESH-only gate in shouldDeleteReplacedSegments was removed: any 
replacement protocol
+    // (REFRESH snapshot replace, APPEND minion replace, segment-group merge) 
gets the same
+    // configurable grace window for its replaced segments.
     TableConfig tableConfig = 
appendTableBuilder().setReplacedSegmentsRetentionPeriod("7d").build();
 
-    String entryId = UUID.randomUUID().toString();
+    String recentEntryId = UUID.randomUUID().toString();
     long recentTimestamp = System.currentTimeMillis();
+    String oldEntryId = UUID.randomUUID().toString();
+    long oldTimestamp = System.currentTimeMillis() - 8 * 24 * 60 * 60 * 1000L; 
// 8 days ago > 7d retention
+
+    SegmentLineage lineage = new SegmentLineage("testTable_OFFLINE");
+    lineage.addLineageEntry(recentEntryId,
+        new LineageEntry(Arrays.asList("recent_src"), 
Arrays.asList("recent_dst"), LineageEntryState.COMPLETED,
+            recentTimestamp));
+    lineage.addLineageEntry(oldEntryId,
+        new LineageEntry(Arrays.asList("old_src"), Arrays.asList("old_dst"), 
LineageEntryState.COMPLETED,
+            oldTimestamp));
+
+    List<String> segmentsToDelete = new ArrayList<>();
+    _lineageManager.updateLineageForRetention(tableConfig, lineage,
+        Arrays.asList("recent_src", "recent_dst", "old_src", "old_dst"), 
segmentsToDelete, new HashSet<>());
+
+    assertFalse(segmentsToDelete.contains("recent_src"),
+        "APPEND table source segments must be retained within the configured 
retention window");
+    assertTrue(segmentsToDelete.contains("old_src"),
+        "APPEND table source segments must be deleted once the configured 
retention window has elapsed");
+  }
+
+  @Test
+  public void testAppendTableDefaultRetentionDeletesAfterFourHours() {
+    // APPEND table with no configured replacedSegmentsRetentionPeriod falls 
back to the 4-hour default:
+    // a lineage entry completed 5 hours ago is past the window and its source 
segments MUST be deleted.
+    TableConfig tableConfig = appendTableBuilder().build();
+
+    String entryId = UUID.randomUUID().toString();
+    long fiveHoursAgo = System.currentTimeMillis() - 5 * 60 * 60 * 1000L;
     SegmentLineage lineage = new SegmentLineage("testTable_OFFLINE");
     lineage.addLineageEntry(entryId,
-        new LineageEntry(Arrays.asList("src1"), Arrays.asList("dst1"), 
LineageEntryState.COMPLETED, recentTimestamp));
+        new LineageEntry(Arrays.asList("src1"), Arrays.asList("dst1"), 
LineageEntryState.COMPLETED, fiveHoursAgo));
 
     List<String> segmentsToDelete = new ArrayList<>();
     _lineageManager.updateLineageForRetention(tableConfig, lineage, 
Arrays.asList("src1", "dst1"), segmentsToDelete,
         new HashSet<>());
 
     assertTrue(segmentsToDelete.contains("src1"),
-        "APPEND table source segments must always be eligible for deletion");
+        "APPEND table source segments must be deleted once the default 4-hour 
retention has elapsed");
+  }
+
+  @Test
+  public void testAppendTableDefaultRetentionRetainsWithinFourHours() {
+    // APPEND table with no configured replacedSegmentsRetentionPeriod falls 
back to the 4-hour default:
+    // a lineage entry completed 1 hour ago is within the window and its 
source segments must be retained.
+    TableConfig tableConfig = appendTableBuilder().build();
+
+    String entryId = UUID.randomUUID().toString();
+    long oneHourAgo = System.currentTimeMillis() - 60 * 60 * 1000L;
+    SegmentLineage lineage = new SegmentLineage("testTable_OFFLINE");
+    lineage.addLineageEntry(entryId,
+        new LineageEntry(Arrays.asList("src1"), Arrays.asList("dst1"), 
LineageEntryState.COMPLETED, oneHourAgo));
+
+    List<String> segmentsToDelete = new ArrayList<>();
+    _lineageManager.updateLineageForRetention(tableConfig, lineage, 
Arrays.asList("src1", "dst1"), segmentsToDelete,
+        new HashSet<>());
+
+    assertFalse(segmentsToDelete.contains("src1"),
+        "APPEND table source segments must be retained within the default 
4-hour retention window");
+  }
+
+  @Test
+  public void testRefreshTableDefaultRetentionRetainsBeyondFourHours() {
+    // A REFRESH table's default retention (24 hours) is longer than an APPEND 
table's (4 hours): a lineage
+    // entry completed 5 hours ago is retained on REFRESH while it would be 
deleted on APPEND (see
+    // testAppendTableDefaultRetentionDeletesAfterFourHours). This pins the 
per-ingestion-type default divergence.
+    TableConfig tableConfig = refreshTableBuilder().build();
+
+    String entryId = UUID.randomUUID().toString();
+    long fiveHoursAgo = System.currentTimeMillis() - 5 * 60 * 60 * 1000L;
+    SegmentLineage lineage = new SegmentLineage("testTable_OFFLINE");
+    lineage.addLineageEntry(entryId,
+        new LineageEntry(Arrays.asList("src1"), Arrays.asList("dst1"), 
LineageEntryState.COMPLETED, fiveHoursAgo));
+
+    List<String> segmentsToDelete = new ArrayList<>();
+    _lineageManager.updateLineageForRetention(tableConfig, lineage, 
Arrays.asList("src1", "dst1"), segmentsToDelete,
+        new HashSet<>());
+
+    assertFalse(segmentsToDelete.contains("src1"),
+        "REFRESH table source segments must be retained within the default 
24-hour retention window");
   }
 
   // 
---------------------------------------------------------------------------
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
index ad922636e12..af8a4915a94 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
@@ -100,9 +100,12 @@ public class LineageDeleteInterleavingIntegrationTest {
     _resourceManager.addTable(tableConfig);
 
     TEST_INSTANCE.addDummySchema(RETENTION_RAW_TABLE_NAME);
+    // replacedSegmentsRetentionPeriod is set to 0s so the lineage-cleanup 
pass deletes replaced segments as soon
+    // as the entry has a timestamp strictly older than "now". This keeps the 
test deterministic without having to
+    // sleep through the default retention window (4 hours for this APPEND 
table).
     TableConfig retentionTableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RETENTION_RAW_TABLE_NAME).setNumReplicas(1)
-            .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").build();
+            
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setReplacedSegmentsRetentionPeriod("0s").build();
     _resourceManager.addTable(retentionTableConfig);
 
     // RetentionManager configured with zero frequencies so we can drive 
processTable() directly and validate the
@@ -378,9 +381,9 @@ public class LineageDeleteInterleavingIntegrationTest {
     assertTrue(getSegments(RETENTION_OFFLINE_TABLE_NAME).contains(sExpired));
 
     // Run retention again. Now the COMPLETED entry's segmentsFrom is still 
lineage-locked w.r.t. time-based
-    // purge, but the lineage-cleanup pass is allowed to delete it (table is 
APPEND, so the replaced-segments
-    // retention window is bypassed). The cleanup must go through 
deleteSegmentsForLineageCleanup so it isn't
-    // self-blocked by the new check.
+    // purge, but the lineage-cleanup pass is allowed to delete it (the 
table's replacedSegmentsRetentionPeriod
+    // is configured to 0s, so the COMPLETED entry exits its retention window 
as soon as any time has elapsed).
+    // The cleanup must go through deleteSegmentsForLineageCleanup so it isn't 
self-blocked by the new check.
     _retentionManager.runProcessTable(RETENTION_OFFLINE_TABLE_NAME);
     // IdealState updates inside deleteSegmentsForLineageCleanup are 
synchronous (only the deep-store file deletion
     // is async via SegmentDeletionManager), so we can assert directly.
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index d5a7be2923b..5afeb48951a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.retention;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.lineage.LineageEntry;
@@ -51,6 +52,10 @@ public class SegmentLineageCleanupTest {
   private static final ControllerTest TEST_INSTANCE = 
ControllerTest.getInstance();
   private static final String OFFLINE_TABLE_NAME = "segmentTable_OFFLINE";
   private static final String REFRESH_OFFLINE_TABLE_NAME = 
"refreshSegmentTable_OFFLINE";
+  private static final String RETENTION_WINDOW_OFFLINE_TABLE_NAME = 
"retentionWindowSegmentTable_OFFLINE";
+  // Short retention so the test can observe the before/after states without 
holding up CI for long.
+  private static final String RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION = 
"2s";
+  private static final long RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION_MS = 
2_000L;
 
   private PinotHelixResourceManager _resourceManager;
   private RetentionManager _retentionManager;
@@ -73,10 +78,15 @@ public class SegmentLineageCleanupTest {
     // Create a schema
     
TEST_INSTANCE.addDummySchema(TableNameBuilder.extractRawTableName(OFFLINE_TABLE_NAME));
     
TEST_INSTANCE.addDummySchema(TableNameBuilder.extractRawTableName(REFRESH_OFFLINE_TABLE_NAME));
+    
TEST_INSTANCE.addDummySchema(TableNameBuilder.extractRawTableName(RETENTION_WINDOW_OFFLINE_TABLE_NAME));
 
-    // Update table config
+    // Update table config. replacedSegmentsRetentionPeriod is set to 0s so 
the lineage-cleanup pass deletes
+    // replaced segments as soon as the lineage entry timestamp is strictly 
older than "now". Without this,
+    // the default retention (4 hours for this APPEND table) would make the 
COMPLETED-lineage assertions in
+    // testSegmentLineageCleanup unable to observe deletion within the test's 
wait window.
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1).build();
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1)
+            .setReplacedSegmentsRetentionPeriod("0s").build();
     _resourceManager.addTable(tableConfig);
 
     IngestionConfig ingestionConfig = new IngestionConfig();
@@ -85,6 +95,13 @@ public class SegmentLineageCleanupTest {
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME).setNumReplicas(1)
             .setIngestionConfig(ingestionConfig).build();
     _resourceManager.addTable(refreshTableConfig);
+
+    // Table dedicated to exercising the replacedSegmentsRetentionPeriod 
window: the first retention pass must keep
+    // the replaced segments, and the second pass (after the window elapses) 
must delete them.
+    TableConfig retentionWindowTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RETENTION_WINDOW_OFFLINE_TABLE_NAME).setNumReplicas(1)
+            
.setReplacedSegmentsRetentionPeriod(RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION).build();
+    _resourceManager.addTable(retentionWindowTableConfig);
   }
 
   @Test
@@ -146,8 +163,12 @@ public class SegmentLineageCleanupTest {
   }
 
   private void verifySegmentsDeleted(int expectedNumRemainingSegments) {
+    verifySegmentsDeleted(OFFLINE_TABLE_NAME, expectedNumRemainingSegments);
+  }
+
+  private void verifySegmentsDeleted(String tableNameWithType, int 
expectedNumRemainingSegments) {
     // Segment deletion happens asynchronously
-    TestUtils.waitForCondition(aVoid -> getNumSegments(OFFLINE_TABLE_NAME) == 
expectedNumRemainingSegments, 60_000L,
+    TestUtils.waitForCondition(aVoid -> getNumSegments(tableNameWithType) == 
expectedNumRemainingSegments, 60_000L,
         "Failed to delete the segments");
   }
 
@@ -186,6 +207,43 @@ public class SegmentLineageCleanupTest {
     assertEquals(_resourceManager.getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME, 
true).size(), 3);
   }
 
+  // Verifies that replacedSegmentsRetentionPeriod gates lineage-cleanup 
deletion: the first retention pass right
+  // after the lineage entry transitions to COMPLETED must preserve the 
replaced segments, and a later pass run
+  // after the configured window elapses must delete them.
+  @Test
+  public void testReplacedSegmentsRetentionWindow()
+      throws InterruptedException {
+    for (int i = 0; i < 2; i++) {
+      _resourceManager.addNewSegment(RETENTION_WINDOW_OFFLINE_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadata(RETENTION_WINDOW_OFFLINE_TABLE_NAME,
 "src_" + i), "downloadUrl");
+    }
+    _resourceManager.addNewSegment(RETENTION_WINDOW_OFFLINE_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(RETENTION_WINDOW_OFFLINE_TABLE_NAME,
 "merged_0"), "downloadUrl");
+    assertEquals(getNumSegments(RETENTION_WINDOW_OFFLINE_TABLE_NAME), 3);
+
+    long completedAtMs = System.currentTimeMillis();
+    SegmentLineage segmentLineage = new 
SegmentLineage(RETENTION_WINDOW_OFFLINE_TABLE_NAME);
+    segmentLineage.addLineageEntry("0",
+        new LineageEntry(Arrays.asList("src_0", "src_1"), 
Arrays.asList("merged_0"), LineageEntryState.COMPLETED,
+            completedAtMs));
+    
SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(),
 segmentLineage, -1);
+
+    // First pass runs inside the retention window — replaced segments must 
remain.
+    _retentionManager.processTable(RETENTION_WINDOW_OFFLINE_TABLE_NAME);
+    assertEquals(getNumSegments(RETENTION_WINDOW_OFFLINE_TABLE_NAME), 3);
+
+    // Sleep just past the configured window. Add a small buffer because the 
cleanup uses a strict less-than
+    // comparison against `now - retentionMs`, and we want any clock 
granularity to be on the safe side.
+    long sleepMs = (System.currentTimeMillis() - completedAtMs > 
RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION_MS)
+        ? 100L : (RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION_MS + 200L);
+    Thread.sleep(sleepMs);
+
+    // Second pass runs after the window — replaced segments must now be 
deleted.
+    _retentionManager.processTable(RETENTION_WINDOW_OFFLINE_TABLE_NAME);
+    verifySegmentsDeleted(RETENTION_WINDOW_OFFLINE_TABLE_NAME, 1);
+    assertEquals(getSegments(RETENTION_WINDOW_OFFLINE_TABLE_NAME), 
Collections.singletonList("merged_0"));
+  }
+
   @AfterClass
   public void tearDown() {
     TEST_INSTANCE.cleanup();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to