This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 16efb8021be Honor replacedSegmentsRetentionPeriod for all table types
(#18583)
16efb8021be is described below
commit 16efb8021be5570a5fa898bc16f7d488fc5ada7b
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu Jun 4 02:39:05 2026 +0530
Honor replacedSegmentsRetentionPeriod for all table types (#18583)
* Enable replacedSegmentsRetentionPeriod for all table types
* Configure replacedSegmentsRetentionPeriod=0s in tests that asserted
immediate APPEND-table cleanup
The replaced-segments retention window now applies uniformly to all batch
ingestion types,
so tests that previously observed immediate cleanup on APPEND tables must
explicitly opt
into the immediate-cleanup behavior via the table config.
* Add test that validates replacedSegmentsRetentionPeriod gates lineage
cleanup
Configures a separate OFFLINE table with a 2-second window. First retention
pass runs
inside the window and must keep the replaced segments; the second pass, run
after the
window elapses, must delete them.
* Resolve unconfigured replacedSegmentsRetentionPeriod default per
ingestion type
When replacedSegmentsRetentionPeriod is set on a table it continues to
apply to
every table type. When it is unset, the default is now resolved per
ingestion
type: 24 hours for REFRESH tables (their replaced segments back a potential
lineage rollback) and 4 hours for other table types (only a broker-catchup
window is needed before the replaced source segments are dropped).
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
.../helix/core/lineage/DefaultLineageManager.java | 37 +++++-----
.../core/lineage/DefaultLineageManagerTest.java | 84 ++++++++++++++++++++--
.../LineageDeleteInterleavingIntegrationTest.java | 11 +--
.../core/retention/SegmentLineageCleanupTest.java | 64 ++++++++++++++++-
4 files changed, 168 insertions(+), 28 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
index fa83fb9ab85..03e098dad33 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManager.java
@@ -39,7 +39,12 @@ import org.slf4j.LoggerFactory;
public class DefaultLineageManager implements LineageManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultLineageManager.class);
- private static final long REPLACED_SEGMENTS_RETENTION_IN_MILLIS =
TimeUnit.DAYS.toMillis(1L); // 1 day
+ // Default grace window before replaced (segmentsFrom) segments are dropped,
used when the table does not
+ // configure replacedSegmentsRetentionPeriod. REFRESH tables keep a longer
window because their replaced
+ // segments are the only copy of the previous snapshot and may be needed for
a lineage rollback; other
+ // (e.g. APPEND) tables only need enough of a window for brokers to catch up
on the latest lineage/IS/EV.
+ private static final long REFRESH_REPLACED_SEGMENTS_RETENTION_IN_MILLIS =
TimeUnit.HOURS.toMillis(24L); // 24 hrs
+ private static final long DEFAULT_REPLACED_SEGMENTS_RETENTION_IN_MILLIS =
TimeUnit.HOURS.toMillis(4L); // 4 hrs
private static final long LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS =
TimeUnit.DAYS.toMillis(1L); // 1 day
protected ControllerConf _controllerConf;
@@ -78,9 +83,15 @@ public class DefaultLineageManager implements LineageManager
{
long lineageCleanupRetentionMs = getRetentionMsFromConfig(
tableConfig.getValidationConfig().getLineageEntryCleanupRetentionPeriod(),
LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS, tableNameWithType,
"lineageEntryCleanupRetentionPeriod");
+ // When replacedSegmentsRetentionPeriod is configured it is honored as-is
for every table type. When it is
+ // absent we fall back to a longer default for REFRESH tables (their
replaced segments back a potential
+ // lineage rollback) and a shorter default for other table types (only a
broker-catchup window is needed).
+ String batchSegmentIngestionType =
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
+ long defaultReplacedSegmentsRetentionMs =
batchSegmentIngestionType.equalsIgnoreCase("REFRESH")
+ ? REFRESH_REPLACED_SEGMENTS_RETENTION_IN_MILLIS :
DEFAULT_REPLACED_SEGMENTS_RETENTION_IN_MILLIS;
long replacedSegmentsRetentionMs = getRetentionMsFromConfig(
tableConfig.getValidationConfig().getReplacedSegmentsRetentionPeriod(),
- REPLACED_SEGMENTS_RETENTION_IN_MILLIS, tableNameWithType,
"replacedSegmentsRetentionPeriod");
+ defaultReplacedSegmentsRetentionMs, tableNameWithType,
"replacedSegmentsRetentionPeriod");
Set<String> segmentsForTable = new HashSet<>(allSegments);
Iterator<LineageEntry> lineageEntryIterator =
lineage.getLineageEntries().values().iterator();
while (lineageEntryIterator.hasNext()) {
@@ -94,7 +105,7 @@ public class DefaultLineageManager implements LineageManager
{
} else {
// If the lineage state is 'COMPLETED' and we already preserved the
original segments for the required
// retention, it is safe to delete all segments from 'segmentsFrom'
- if (shouldDeleteReplacedSegments(tableConfig, lineageEntry,
replacedSegmentsRetentionMs)) {
+ if (shouldDeleteReplacedSegments(lineageEntry,
replacedSegmentsRetentionMs)) {
segmentsToDelete.addAll(sourceSegments);
}
}
@@ -122,24 +133,18 @@ public class DefaultLineageManager implements
LineageManager {
/**
* Helper function to decide whether we should delete segmentsFrom (replaced
segments) given a lineage entry.
*
- * The replaced segments are safe to delete if either:
- * 1) The table is not "REFRESH" (e.g. "APPEND"), in which case they are
deleted immediately, or
- * 2) The lineage entry has been in "COMPLETED" state for longer than {@code
replacedSegmentsRetentionMs}
- * (configurable via {@code replacedSegmentsRetentionPeriod} in table
config, defaulting to 1 day).
+ * The replaced segments are safe to delete once the lineage entry has been
in "COMPLETED" state for longer
+ * than {@code replacedSegmentsRetentionMs}. The retention applies uniformly
to all batch ingestion types —
+ * any replacement protocol (REFRESH-table snapshot replace, APPEND-table
minion-driven replace,
+ * segment-group merge) gets the same configurable grace window before its
replaced segments are dropped.
+ * The window is configurable via {@code replacedSegmentsRetentionPeriod} in
table config; when it is unset
+ * the default is resolved per ingestion type by the caller (longer for
REFRESH, shorter for others).
*
- * @param tableConfig a table config
* @param lineageEntry lineage entry
* @param replacedSegmentsRetentionMs configured retention in ms for
replaced segments
* @return True if we can safely delete the replaced segments. False
otherwise.
*/
- private boolean shouldDeleteReplacedSegments(TableConfig tableConfig,
LineageEntry lineageEntry,
- long replacedSegmentsRetentionMs) {
- // TODO: Currently, we preserve the replaced segments for REFRESH tables
only. Once we support
- // data rollback for APPEND tables, we should remove this check.
- String batchSegmentIngestionType =
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
- if (!batchSegmentIngestionType.equalsIgnoreCase("REFRESH")) {
- return true;
- }
+ private boolean shouldDeleteReplacedSegments(LineageEntry lineageEntry, long
replacedSegmentsRetentionMs) {
// Strict < means a 0ms retention won't delete on the exact same
millisecond; this is intentional to
// avoid edge-case races and is consistent with the existing behavior for
non-zero retention values.
return lineageEntry.getTimestamp() < (System.currentTimeMillis() -
replacedSegmentsRetentionMs);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManagerTest.java
index f803a06a08a..5b875cbc7e8 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/DefaultLineageManagerTest.java
@@ -159,22 +159,96 @@ public class DefaultLineageManagerTest {
}
@Test
- public void testAppendTableAlwaysDeletesRegardlessOfRetentionConfig() {
- // APPEND table: source segments are always eligible for deletion
regardless of retention setting
+ public void testAppendTableHonorsReplacedSegmentsRetentionPeriod() {
+ // APPEND table with a non-zero replacedSegmentsRetentionPeriod must defer
deletion until the
+ // retention window elapses — same semantics as REFRESH. This pins the
symmetric behavior after
+ // the REFRESH-only gate in shouldDeleteReplacedSegments was removed: any
replacement protocol
+ // (REFRESH snapshot replace, APPEND minion replace, segment-group merge)
gets the same
+ // configurable grace window for its replaced segments.
TableConfig tableConfig =
appendTableBuilder().setReplacedSegmentsRetentionPeriod("7d").build();
- String entryId = UUID.randomUUID().toString();
+ String recentEntryId = UUID.randomUUID().toString();
long recentTimestamp = System.currentTimeMillis();
+ String oldEntryId = UUID.randomUUID().toString();
+ long oldTimestamp = System.currentTimeMillis() - 8 * 24 * 60 * 60 * 1000L;
// 8 days ago > 7d retention
+
+ SegmentLineage lineage = new SegmentLineage("testTable_OFFLINE");
+ lineage.addLineageEntry(recentEntryId,
+ new LineageEntry(Arrays.asList("recent_src"),
Arrays.asList("recent_dst"), LineageEntryState.COMPLETED,
+ recentTimestamp));
+ lineage.addLineageEntry(oldEntryId,
+ new LineageEntry(Arrays.asList("old_src"), Arrays.asList("old_dst"),
LineageEntryState.COMPLETED,
+ oldTimestamp));
+
+ List<String> segmentsToDelete = new ArrayList<>();
+ _lineageManager.updateLineageForRetention(tableConfig, lineage,
+ Arrays.asList("recent_src", "recent_dst", "old_src", "old_dst"),
segmentsToDelete, new HashSet<>());
+
+ assertFalse(segmentsToDelete.contains("recent_src"),
+ "APPEND table source segments must be retained within the configured
retention window");
+ assertTrue(segmentsToDelete.contains("old_src"),
+ "APPEND table source segments must be deleted once the configured
retention window has elapsed");
+ }
+
+ @Test
+ public void testAppendTableDefaultRetentionDeletesAfterFourHours() {
+ // APPEND table with no configured replacedSegmentsRetentionPeriod falls
back to the 4-hour default:
+ // a lineage entry completed 5 hours ago is past the window and its source
segments MUST be deleted.
+ TableConfig tableConfig = appendTableBuilder().build();
+
+ String entryId = UUID.randomUUID().toString();
+ long fiveHoursAgo = System.currentTimeMillis() - 5 * 60 * 60 * 1000L;
SegmentLineage lineage = new SegmentLineage("testTable_OFFLINE");
lineage.addLineageEntry(entryId,
- new LineageEntry(Arrays.asList("src1"), Arrays.asList("dst1"),
LineageEntryState.COMPLETED, recentTimestamp));
+ new LineageEntry(Arrays.asList("src1"), Arrays.asList("dst1"),
LineageEntryState.COMPLETED, fiveHoursAgo));
List<String> segmentsToDelete = new ArrayList<>();
_lineageManager.updateLineageForRetention(tableConfig, lineage,
Arrays.asList("src1", "dst1"), segmentsToDelete,
new HashSet<>());
assertTrue(segmentsToDelete.contains("src1"),
- "APPEND table source segments must always be eligible for deletion");
+ "APPEND table source segments must be deleted once the default 4-hour
retention has elapsed");
+ }
+
+ @Test
+ public void testAppendTableDefaultRetentionRetainsWithinFourHours() {
+ // APPEND table with no configured replacedSegmentsRetentionPeriod falls
back to the 4-hour default:
+ // a lineage entry completed 1 hour ago is within the window and its
source segments must be retained.
+ TableConfig tableConfig = appendTableBuilder().build();
+
+ String entryId = UUID.randomUUID().toString();
+ long oneHourAgo = System.currentTimeMillis() - 60 * 60 * 1000L;
+ SegmentLineage lineage = new SegmentLineage("testTable_OFFLINE");
+ lineage.addLineageEntry(entryId,
+ new LineageEntry(Arrays.asList("src1"), Arrays.asList("dst1"),
LineageEntryState.COMPLETED, oneHourAgo));
+
+ List<String> segmentsToDelete = new ArrayList<>();
+ _lineageManager.updateLineageForRetention(tableConfig, lineage,
Arrays.asList("src1", "dst1"), segmentsToDelete,
+ new HashSet<>());
+
+ assertFalse(segmentsToDelete.contains("src1"),
+ "APPEND table source segments must be retained within the default
4-hour retention window");
+ }
+
+ @Test
+ public void testRefreshTableDefaultRetentionRetainsBeyondFourHours() {
+ // A REFRESH table's default retention (24 hours) is longer than an APPEND
table's (4 hours): a lineage
+ // entry completed 5 hours ago is retained on REFRESH while it would be
deleted on APPEND (see
+ // testAppendTableDefaultRetentionDeletesAfterFourHours). This pins the
per-ingestion-type default divergence.
+ TableConfig tableConfig = refreshTableBuilder().build();
+
+ String entryId = UUID.randomUUID().toString();
+ long fiveHoursAgo = System.currentTimeMillis() - 5 * 60 * 60 * 1000L;
+ SegmentLineage lineage = new SegmentLineage("testTable_OFFLINE");
+ lineage.addLineageEntry(entryId,
+ new LineageEntry(Arrays.asList("src1"), Arrays.asList("dst1"),
LineageEntryState.COMPLETED, fiveHoursAgo));
+
+ List<String> segmentsToDelete = new ArrayList<>();
+ _lineageManager.updateLineageForRetention(tableConfig, lineage,
Arrays.asList("src1", "dst1"), segmentsToDelete,
+ new HashSet<>());
+
+ assertFalse(segmentsToDelete.contains("src1"),
+ "REFRESH table source segments must be retained within the default
24-hour retention window");
}
//
---------------------------------------------------------------------------
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
index ad922636e12..af8a4915a94 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/lineage/LineageDeleteInterleavingIntegrationTest.java
@@ -100,9 +100,12 @@ public class LineageDeleteInterleavingIntegrationTest {
_resourceManager.addTable(tableConfig);
TEST_INSTANCE.addDummySchema(RETENTION_RAW_TABLE_NAME);
+ // replacedSegmentsRetentionPeriod is set to 0s so the lineage-cleanup
pass deletes replaced segments as soon
+ // as the entry has a timestamp strictly older than "now". This keeps the
test deterministic without having to
+ // sleep through the default retention window (4 hours for this APPEND
table).
TableConfig retentionTableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RETENTION_RAW_TABLE_NAME).setNumReplicas(1)
- .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").build();
+
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setReplacedSegmentsRetentionPeriod("0s").build();
_resourceManager.addTable(retentionTableConfig);
// RetentionManager configured with zero frequencies so we can drive
processTable() directly and validate the
@@ -378,9 +381,9 @@ public class LineageDeleteInterleavingIntegrationTest {
assertTrue(getSegments(RETENTION_OFFLINE_TABLE_NAME).contains(sExpired));
// Run retention again. Now the COMPLETED entry's segmentsFrom is still
lineage-locked w.r.t. time-based
- // purge, but the lineage-cleanup pass is allowed to delete it (table is
APPEND, so the replaced-segments
- // retention window is bypassed). The cleanup must go through
deleteSegmentsForLineageCleanup so it isn't
- // self-blocked by the new check.
+ // purge, but the lineage-cleanup pass is allowed to delete it (the
table's replacedSegmentsRetentionPeriod
+ // is configured to 0s, so the COMPLETED entry exits its retention window
as soon as any time has elapsed).
+ // The cleanup must go through deleteSegmentsForLineageCleanup so it isn't
self-blocked by the new check.
_retentionManager.runProcessTable(RETENTION_OFFLINE_TABLE_NAME);
// IdealState updates inside deleteSegmentsForLineageCleanup are
synchronous (only the deep-store file deletion
// is async via SegmentDeletionManager), so we can assert directly.
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index d5a7be2923b..5afeb48951a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.retention;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.lineage.LineageEntry;
@@ -51,6 +52,10 @@ public class SegmentLineageCleanupTest {
private static final ControllerTest TEST_INSTANCE =
ControllerTest.getInstance();
private static final String OFFLINE_TABLE_NAME = "segmentTable_OFFLINE";
private static final String REFRESH_OFFLINE_TABLE_NAME =
"refreshSegmentTable_OFFLINE";
+ private static final String RETENTION_WINDOW_OFFLINE_TABLE_NAME =
"retentionWindowSegmentTable_OFFLINE";
+ // Short retention so the test can observe the before/after states without
holding up CI for long.
+ private static final String RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION =
"2s";
+ private static final long RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION_MS =
2_000L;
private PinotHelixResourceManager _resourceManager;
private RetentionManager _retentionManager;
@@ -73,10 +78,15 @@ public class SegmentLineageCleanupTest {
// Create a schema
TEST_INSTANCE.addDummySchema(TableNameBuilder.extractRawTableName(OFFLINE_TABLE_NAME));
TEST_INSTANCE.addDummySchema(TableNameBuilder.extractRawTableName(REFRESH_OFFLINE_TABLE_NAME));
+
TEST_INSTANCE.addDummySchema(TableNameBuilder.extractRawTableName(RETENTION_WINDOW_OFFLINE_TABLE_NAME));
- // Update table config
+ // Update table config. replacedSegmentsRetentionPeriod is set to 0s so
the lineage-cleanup pass deletes
+ // replaced segments as soon as the lineage entry timestamp is strictly
older than "now". Without this,
+ // the default retention (4 hours for this APPEND table) would make the
COMPLETED-lineage assertions in
+ // testSegmentLineageCleanup unable to observe deletion within the test's
wait window.
TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1).build();
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1)
+ .setReplacedSegmentsRetentionPeriod("0s").build();
_resourceManager.addTable(tableConfig);
IngestionConfig ingestionConfig = new IngestionConfig();
@@ -85,6 +95,13 @@ public class SegmentLineageCleanupTest {
new
TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME).setNumReplicas(1)
.setIngestionConfig(ingestionConfig).build();
_resourceManager.addTable(refreshTableConfig);
+
+ // Table dedicated to exercising the replacedSegmentsRetentionPeriod
window: the first retention pass must keep
+ // the replaced segments, and the second pass (after the window elapses)
must delete them.
+ TableConfig retentionWindowTableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RETENTION_WINDOW_OFFLINE_TABLE_NAME).setNumReplicas(1)
+
.setReplacedSegmentsRetentionPeriod(RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION).build();
+ _resourceManager.addTable(retentionWindowTableConfig);
}
@Test
@@ -146,8 +163,12 @@ public class SegmentLineageCleanupTest {
}
private void verifySegmentsDeleted(int expectedNumRemainingSegments) {
+ verifySegmentsDeleted(OFFLINE_TABLE_NAME, expectedNumRemainingSegments);
+ }
+
+ private void verifySegmentsDeleted(String tableNameWithType, int
expectedNumRemainingSegments) {
// Segment deletion happens asynchronously
- TestUtils.waitForCondition(aVoid -> getNumSegments(OFFLINE_TABLE_NAME) ==
expectedNumRemainingSegments, 60_000L,
+ TestUtils.waitForCondition(aVoid -> getNumSegments(tableNameWithType) ==
expectedNumRemainingSegments, 60_000L,
"Failed to delete the segments");
}
@@ -186,6 +207,43 @@ public class SegmentLineageCleanupTest {
assertEquals(_resourceManager.getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
true).size(), 3);
}
+ // Verifies that replacedSegmentsRetentionPeriod gates lineage-cleanup
deletion: the first retention pass right
+ // after the lineage entry transitions to COMPLETED must preserve the
replaced segments, and a later pass run
+ // after the configured window elapses must delete them.
+ @Test
+ public void testReplacedSegmentsRetentionWindow()
+ throws InterruptedException {
+ for (int i = 0; i < 2; i++) {
+ _resourceManager.addNewSegment(RETENTION_WINDOW_OFFLINE_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(RETENTION_WINDOW_OFFLINE_TABLE_NAME,
"src_" + i), "downloadUrl");
+ }
+ _resourceManager.addNewSegment(RETENTION_WINDOW_OFFLINE_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(RETENTION_WINDOW_OFFLINE_TABLE_NAME,
"merged_0"), "downloadUrl");
+ assertEquals(getNumSegments(RETENTION_WINDOW_OFFLINE_TABLE_NAME), 3);
+
+ long completedAtMs = System.currentTimeMillis();
+ SegmentLineage segmentLineage = new
SegmentLineage(RETENTION_WINDOW_OFFLINE_TABLE_NAME);
+ segmentLineage.addLineageEntry("0",
+ new LineageEntry(Arrays.asList("src_0", "src_1"),
Arrays.asList("merged_0"), LineageEntryState.COMPLETED,
+ completedAtMs));
+
SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(),
segmentLineage, -1);
+
+ // First pass runs inside the retention window — replaced segments must
remain.
+ _retentionManager.processTable(RETENTION_WINDOW_OFFLINE_TABLE_NAME);
+ assertEquals(getNumSegments(RETENTION_WINDOW_OFFLINE_TABLE_NAME), 3);
+
+ // Sleep just past the configured window. Add a small buffer because the
cleanup uses a strict less-than
+ // comparison against `now - retentionMs`, and we want any clock
granularity to be on the safe side.
+ long sleepMs = (System.currentTimeMillis() - completedAtMs >
RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION_MS)
+ ? 100L : (RETENTION_WINDOW_REPLACED_SEGMENTS_RETENTION_MS + 200L);
+ Thread.sleep(sleepMs);
+
+ // Second pass runs after the window — replaced segments must now be
deleted.
+ _retentionManager.processTable(RETENTION_WINDOW_OFFLINE_TABLE_NAME);
+ verifySegmentsDeleted(RETENTION_WINDOW_OFFLINE_TABLE_NAME, 1);
+ assertEquals(getSegments(RETENTION_WINDOW_OFFLINE_TABLE_NAME),
Collections.singletonList("merged_0"));
+ }
+
@AfterClass
public void tearDown() {
TEST_INSTANCE.cleanup();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]