This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 22d583c Improving segment replacement/revert protocol (#7995)
22d583c is described below
commit 22d583c0933e4a59bb6ec19eb6eecf53e9ac5c6d
Author: Seunghyun Lee <[email protected]>
AuthorDate: Thu Jan 13 11:05:30 2022 -0800
Improving segment replacement/revert protocol (#7995)
1. Add retention for original segments for REFRESH table after
the replacement protocol is successfully done. For now, we
use 24 hrs retention. We keep the original data for 24 hours
so that the user can revert back to the original segments
using revertReplaceSegments API.
2. Add the proactive clean-up in startReplaceSegment so that we
do not keep too much data snapshots for REFRESH table. For
now, we allow to keep at most 2 snapshots.
3. Added unit tests.
---
.../helix/core/PinotHelixResourceManager.java | 63 +++++--
.../helix/core/retention/RetentionManager.java | 55 +++++--
.../helix/core/PinotHelixResourceManagerTest.java | 182 +++++++++++++++++++++
.../core/retention/SegmentLineageCleanupTest.java | 90 +++++++---
4 files changed, 344 insertions(+), 46 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0af8abe..13a1fc3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2783,6 +2783,10 @@ public class PinotHelixResourceManager {
try {
DEFAULT_RETRY_POLICY.attempt(() -> {
+ // Fetch table config
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ Preconditions.checkNotNull(tableConfig, "Table config is not available
for table '%s'", tableNameWithType);
+
// Fetch the segment lineage metadata
ZNRecord segmentLineageZNRecord =
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
@@ -2794,7 +2798,6 @@ public class PinotHelixResourceManager {
segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
expectedVersion = segmentLineageZNRecord.getVersion();
}
-
// Check that the segment lineage entry id doesn't exists in the
segment lineage
Preconditions.checkArgument(segmentLineage.getLineageEntry(segmentLineageEntryId)
== null,
String.format("SegmentLineageEntryId (%s) already exists in the
segment lineage.", segmentLineageEntryId));
@@ -2811,15 +2814,46 @@ public class PinotHelixResourceManager {
// By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
- // When 'forceCleanup' is enabled, we need to proactively revert the
lineage entry when we find the lineage
- // entry with the same 'segmentFrom' values.
- if (forceCleanup && lineageEntry.getState() ==
LineageEntryState.IN_PROGRESS && CollectionUtils
+ // When 'forceCleanup' is enabled, we need to proactively clean up
at the following cases:
+ // 1. Revert the lineage entry when we find the lineage entry with
the same 'segmentFrom' values. This is
+ // used to un-block the segment replacement protocol if the
previous attempt failed in the middle.
+ // 2. Proactively delete the oldest data snapshot to make sure that
we only keep at most 2 data snapshots
+ // at any time in case of REFRESH use case.
+ if (forceCleanup) {
+ if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS &&
CollectionUtils
.isEqualCollection(segmentsFrom,
lineageEntry.getSegmentsFrom())) {
- // Update segment lineage entry to 'REVERTED'
- updateSegmentLineageEntryToReverted(tableNameWithType,
segmentLineage, entryId, lineageEntry);
-
- // Add segments for proactive clean-up.
- segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+ LOGGER.info(
+ "Detected the incomplete lineage entry with the same
'segmentsFrom'. Reverting the lineage "
+ + "entry to unblock the new segment protocol.
tableNameWithType={}, entryId={}, segmentsFrom={}, "
+ + "segmentsTo={}", tableNameWithType, entryId,
lineageEntry.getSegmentsFrom(),
+ lineageEntry.getSegmentsTo());
+
+ // Update segment lineage entry to 'REVERTED'
+ updateSegmentLineageEntryToReverted(tableNameWithType,
segmentLineage, entryId, lineageEntry);
+
+ // Add segments for proactive clean-up.
+ segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+ } else if (lineageEntry.getState() == LineageEntryState.COMPLETED
&& IngestionConfigUtils
+
.getBatchSegmentIngestionType(tableConfig).equalsIgnoreCase("REFRESH") &&
CollectionUtils
+ .isEqualCollection(segmentsFrom,
lineageEntry.getSegmentsTo())) {
+ // This part of code assumes that we only allow at most 2 data
snapshots at a time by proactively
+ // deleting the older snapshots (for REFRESH tables).
+ //
+ // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5) //
previous lineage
+ // (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8) //
current lineage to be updated
+ // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to
keep 2 data snapshots
+ // (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid
the disk space waste.
+ //
+ // TODO: make the number of allowed snapshots configurable to
allow users to keep at most N snapshots
+ // of data. We need to traverse the lineage by N steps
instead of 2 steps. We can build the reverse
+ // hash map (segmentsTo -> segmentsFrom) and traverse up
to N times before deleting.
+ //
+ LOGGER.info(
+ "Proactively deleting the replaced segments for REFRESH
table to avoid the excessive disk waste. "
+ + "tableNameWithType={}, segmentsToCleanUp={}",
tableNameWithType,
+ lineageEntry.getSegmentsFrom());
+ segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
+ }
} else {
// Check that any segment from 'segmentsFrom' does not appear
twice.
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(),
segmentsFrom), String
@@ -2844,6 +2878,7 @@ public class PinotHelixResourceManager {
// Trigger the proactive segment clean up if needed. Once the
lineage is updated in the property store, it
// is safe to physically delete segments.
if (!segmentsToCleanUp.isEmpty()) {
+ LOGGER.info("Cleaning up the segments while startReplaceSegments:
{}", segmentsToCleanUp);
deleteSegments(tableNameWithType, segmentsToCleanUp);
}
return true;
@@ -2899,10 +2934,12 @@ public class PinotHelixResourceManager {
// NO-OPS if the entry is already 'COMPLETED' or 'REVERTED'
if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS) {
- LOGGER.warn("Lineage entry state is not 'IN_PROGRESS'. Cannot update
to 'COMPLETED'. (tableNameWithType={}, "
- + "segmentLineageEntryId={}, state={})", tableNameWithType,
segmentLineageEntryId,
- lineageEntry.getState());
- return true;
+ String errorMsg = String.format(
+ "The target lineage entry state is not 'IN_PROGRESS'. Cannot
update to 'COMPLETED' state. "
+ + "(tableNameWithType=%s, segmentLineageEntryId=%s,
state=%s)", tableNameWithType,
+ segmentLineageEntryId, lineageEntry.getState());
+ LOGGER.error(errorMsg);
+ throw new RuntimeException(errorMsg);
}
// Check that all the segments from 'segmentsTo' exist in the table
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 9ed7a69..a1679dc 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
*/
public class RetentionManager extends ControllerPeriodicTask<Void> {
public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS =
TimeUnit.DAYS.toMillis(5L);
-
+ private static final long REPLACED_SEGMENTS_RETENTION_IN_MILLIS =
TimeUnit.DAYS.toMillis(1L); // 1 day
public static final long LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS =
TimeUnit.DAYS.toMillis(1L); // 1 day
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
@@ -80,13 +80,20 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
@Override
protected void processTable(String tableNameWithType) {
+ // Fetch table config
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.error("Failed to get table config for table: {}",
tableNameWithType);
+ return;
+ }
+
// Manage normal table retention except segment lineage cleanup.
// The reason of separating the logic is that REFRESH only table will be
skipped in the first part,
// whereas the segment lineage cleanup needs to be handled.
- manageRetentionForTable(tableNameWithType);
+ manageRetentionForTable(tableConfig);
// Delete segments based on segment lineage and clean up segment lineage
metadata.
- manageSegmentLineageCleanupForTable(tableNameWithType);
+ manageSegmentLineageCleanupForTable(tableConfig);
}
@Override
@@ -95,16 +102,10 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
}
- private void manageRetentionForTable(String tableNameWithType) {
+ private void manageRetentionForTable(TableConfig tableConfig) {
+ String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Start managing retention for table: {}", tableNameWithType);
- // Build retention strategy from table config
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.error("Failed to get table config for table: {}",
tableNameWithType);
- return;
- }
-
// For offline tables, ensure that the segmentPushType is APPEND.
SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
String segmentPushType =
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
@@ -197,7 +198,8 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
}
- private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+ private void manageSegmentLineageCleanupForTable(TableConfig tableConfig) {
+ String tableNameWithType = tableConfig.getTableName();
try {
DEFAULT_RETRY_POLICY.attempt(() -> {
// Fetch segment lineage
@@ -227,8 +229,11 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
// the lineage entry
segmentLineage.deleteLineageEntry(lineageEntryId);
} else {
- // If the lineage state is 'COMPLETED', it is safe to delete all
segments from 'segmentsFrom'
- segmentsToDelete.addAll(sourceSegments);
+ // If the lineage state is 'COMPLETED' and we already preserved
the original segments for the required
+ // retention, it is safe to delete all segments from
'segmentsFrom'
+ if (shouldDeleteReplacedSegments(tableConfig, lineageEntry)) {
+ segmentsToDelete.addAll(sourceSegments);
+ }
}
} else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
lineageEntry.getState() == LineageEntryState.IN_PROGRESS &&
lineageEntry.getTimestamp()
@@ -271,4 +276,26 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
LOGGER.info("Segment lineage metadata clean-up is successfully processed
for table: {}", tableNameWithType);
}
+
+ /**
+ * Helper function to decide whether we should delete segmentsFrom (replaced
segments) given a lineage entry.
+ *
+ * The replaced segments are safe to delete if the following conditions are
all satisfied
+ * 1) Table is "APPEND"
+ * 2) It has been more than 24 hours since the lineage entry became
"COMPLETED" state.
+ *
+ * @param tableConfig a table config
+ * @param lineageEntry lineage entry
+ * @return True if we can safely delete the replaced segments. False
otherwise.
+ */
+ private boolean shouldDeleteReplacedSegments(TableConfig tableConfig,
LineageEntry lineageEntry) {
+ // TODO: Currently, we preserve the replaced segments for 1 day for
REFRESH tables only. Once we support
+ // data rollback for APPEND tables, we should remove this check.
+ String batchSegmentIngestionType =
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
+ if (!batchSegmentIngestionType.equalsIgnoreCase("REFRESH")
+ || lineageEntry.getTimestamp() < System.currentTimeMillis() -
REPLACED_SEGMENTS_RETENTION_IN_MILLIS) {
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index faf594a..9448fdf 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -56,6 +57,8 @@ import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -86,6 +89,10 @@ public class PinotHelixResourceManagerTest {
private static final String OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ private static final String SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME =
"segmentsReplaceTestRefreshTable";
+ private static final String OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME
=
+
TableNameBuilder.OFFLINE.tableNameWithType(SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+
private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
@@ -652,6 +659,181 @@ public class PinotHelixResourceManagerTest {
}
@Test
+ public void testSegmentReplacementForRefresh()
+ throws IOException, InterruptedException {
+ // Create broker tenant on 1 Brokers
+ Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 1,
0, 0);
+ PinotResourceManagerResponse response =
+
ControllerTestUtils.getHelixResourceManager().createBrokerTenant(brokerTenant);
+ Assert.assertTrue(response.isSuccessful());
+
+ // Create the table
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME)
+
.setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME)
+ .setIngestionConfig(
+ new IngestionConfig(new BatchIngestionConfig(null, "REFRESH",
"DAILY"), null, null, null, null))
+ .build();
+
+ ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
+
+ for (int i = 0; i < 3; i++) {
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s" + i),
+ "downloadUrl");
+ }
+ List<String> segmentsForTable =
ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false);
+ Assert.assertEquals(segmentsForTable.size(), 3);
+
+ List<String> segmentsFrom = Arrays.asList("s0", "s1", "s2");
+ List<String> segmentsTo = Arrays.asList("s3", "s4", "s5");
+
+ String lineageEntryId = ControllerTestUtils.getHelixResourceManager()
+
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
segmentsFrom, segmentsTo, false);
+ SegmentLineage segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
+ Arrays.asList("s0", "s1", "s2"));
+ Assert
+
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(),
Arrays.asList("s3", "s4", "s5"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(),
LineageEntryState.IN_PROGRESS);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false)),
+ new HashSet<>(Arrays.asList("s0", "s1", "s2")));
+
+ // Add new segments
+ for (int i = 3; i < 6; i++) {
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s" + i),
+ "downloadUrl");
+ }
+
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 6);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)),
+ new HashSet<>(Arrays.asList("s0", "s1", "s2")));
+
+ // Call end segment replacements
+ ControllerTestUtils.getHelixResourceManager()
+ .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
lineageEntryId);
+
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 6);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)),
+ new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
+ Arrays.asList("s0", "s1", "s2"));
+ Assert
+
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(),
Arrays.asList("s3", "s4", "s5"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(),
LineageEntryState.COMPLETED);
+
+ // Start the new protocol with "forceCleanup = false" so there will be no
proactive clean-up happening
+ segmentsFrom = Arrays.asList("s3", "s4", "s5");
+ segmentsTo = Arrays.asList("s6", "s7", "s8");
+
+ String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
+
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
segmentsFrom, segmentsTo, false);
+
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(),
+ Arrays.asList("s3", "s4", "s5"));
+ Assert
+
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
Arrays.asList("s6", "s7", "s8"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(),
LineageEntryState.IN_PROGRESS);
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 6);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)),
+ new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+
+ // Add partial segments
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s6"),
+ "downloadUrl");
+
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 7);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)),
+ new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+
+ // Start the new protocol with "forceCleanup = true" to check if 2
different proactive clean-up mechanism works:
+ // 1. the previous lineage entry (s3, s4, s5) -> (s6, s7, s8) should be
"REVERTED"
+ // 2. the older snapshot (s0, s1, s2) needs to be cleaned up because we
are about to upload the 3rd data snapshot
+ segmentsTo = Arrays.asList("s9", "s10", "s11");
+ String lineageEntryId3 = ControllerTestUtils.getHelixResourceManager()
+
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
segmentsFrom, segmentsTo, true);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 3);
+
+ // Check that the previous entry gets reverted
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(),
LineageEntryState.REVERTED);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(),
+ Arrays.asList("s3", "s4", "s5"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(),
+ Arrays.asList("s9", "s10", "s11"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(),
LineageEntryState.IN_PROGRESS);
+
+ // Check that the segments from the older lineage gets deleted
+ waitForSegmentsToDelete(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
3, MAX_TIMEOUT_IN_MILLISECOND);
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 3);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)),
+ new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+
+ // Try to invoke end segment replacement for the reverted entry
+ try {
+ ControllerTestUtils.getHelixResourceManager()
+
.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
lineageEntryId2);
+ } catch (Exception e) {
+ // expected
+ }
+
+ // Add new segments
+ for (int i = 9; i < 12; i++) {
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s" + i),
+ "downloadUrl");
+ }
+
+ // Call end segment replacements
+ ControllerTestUtils.getHelixResourceManager()
+ .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
lineageEntryId3);
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 6);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)),
+ new HashSet<>(Arrays.asList("s9", "s10", "s11")));
+ }
+
+ private void waitForSegmentsToDelete(String tableNameWithType, int
expectedNumSegmentsAfterDelete,
+ long timeOutInMillis)
+ throws InterruptedException {
+ long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
+ do {
+ if
(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(tableNameWithType,
false).size()
+ == expectedNumSegmentsAfterDelete) {
+ return;
+ } else {
+ Thread.sleep(500L);
+ }
+ } while (System.currentTimeMillis() < endTimeMs);
+ throw new RuntimeException("Timeout while waiting for segments to be
deleted");
+ }
+
+ @Test
public void testGetLiveBrokersForTable()
throws IOException {
// Create broker tenant
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index b77e211..4a8cb0a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -36,6 +36,8 @@ import
org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -54,6 +56,7 @@ public class SegmentLineageCleanupTest {
private static final long MAX_TIMEOUT_IN_MILLISECOND = 10_000L; // 10 seconds
private static final String OFFLINE_TABLE_NAME = "segmentTable_OFFLINE";
+ private static final String REFRESH_OFFLINE_TABLE_NAME =
"refreshSegmentTable_OFFLINE";
private static final String BROKER_TENANT_NAME = "brokerTenant";
private static final String SERVER_TENANT_NAME = "serverTenant";
private static final String RETENTION_TIME_UNIT = "DAYS";
@@ -90,20 +93,25 @@ public class SegmentLineageCleanupTest {
controllerMetrics);
// Update table config
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME)
- .setBrokerTenant(BROKER_TENANT_NAME)
- .setServerTenant(SERVER_TENANT_NAME)
- .setNumReplicas(1)
- .setRetentionTimeUnit(RETENTION_TIME_UNIT)
- .setRetentionTimeValue(RETENTION_TIME_VALUE)
- .build();
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
+
.setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1).setRetentionTimeUnit(RETENTION_TIME_UNIT)
+ .setRetentionTimeValue(RETENTION_TIME_VALUE).build();
ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
+
+ IngestionConfig ingestionConfig =
+ new IngestionConfig(new BatchIngestionConfig(null, "REFRESH",
"DAILY"), null, null, null, null);
+ TableConfig refreshTableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME)
+
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1)
+
.setRetentionTimeUnit(RETENTION_TIME_UNIT).setRetentionTimeValue(RETENTION_TIME_VALUE)
+ .setIngestionConfig(ingestionConfig).build();
+ ControllerTestUtils.getHelixResourceManager().addTable(refreshTableConfig);
}
@Test
public void testSegmentLineageCleanup()
throws IOException, InterruptedException {
- // Create metadata for original segments
+ // Create metadata for original segments.
for (int i = 0; i < 5; i++) {
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"segment_" + i), "downloadUrl");
@@ -118,7 +126,7 @@ public class SegmentLineageCleanupTest {
7);
long currentTimeInMillis = System.currentTimeMillis();
- // Validate the case when the lineage entry state is 'IN_PROGRESS'
+ // Validate the case when the lineage entry state is 'IN_PROGRESS'.
SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
segmentLineage.addLineageEntry("0",
new LineageEntry(Arrays.asList("segment_0", "segment_1"),
Arrays.asList("merged_0"),
@@ -131,7 +139,7 @@ public class SegmentLineageCleanupTest {
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
false);
Assert.assertEquals(segmentsForTable.size(), 7);
- // Validate the case when the lineage entry state is 'COMPLETED'
+ // Validate the case when the lineage entry state is 'COMPLETED'.
segmentLineage.updateLineageEntry("0",
new LineageEntry(Arrays.asList("segment_0", "segment_1"),
Arrays.asList("merged_0"),
LineageEntryState.COMPLETED, currentTimeInMillis));
@@ -143,7 +151,7 @@ public class SegmentLineageCleanupTest {
Assert.assertEquals(segmentsForTable.size(), 5);
Assert.assertTrue(Collections.disjoint(segmentsForTable,
Arrays.asList("segment_0", "segment_1")));
- // Validate the case when the lineage entry state is 'COMPLETED' and all
segments are deleted
+ // Validate the case when the lineage entry state is 'COMPLETED' and all
segments are deleted.
ControllerTestUtils.getHelixResourceManager().deleteSegment(OFFLINE_TABLE_NAME,
"merged_0");
waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4);
_retentionManager.processTable(OFFLINE_TABLE_NAME);
@@ -151,12 +159,11 @@ public class SegmentLineageCleanupTest {
segmentsForTable =
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
false);
Assert.assertEquals(segmentsForTable.size(), 4);
Assert.assertTrue(Collections.disjoint(segmentsForTable,
Arrays.asList("segment_0", "segment_1", "merged_0")));
- segmentLineage =
- SegmentLineageAccessHelper
-
.getSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
OFFLINE_TABLE_NAME);
+ segmentLineage = SegmentLineageAccessHelper
+
.getSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
OFFLINE_TABLE_NAME);
Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 0);
- // Validate the case when the lineage entry state is 'IN_PROGRESS' and
timestamp is old
+ // Validate the case when the lineage entry state is 'IN_PROGRESS' and
timestamp is old.
LineageEntry lineageEntry =
new LineageEntry(Arrays.asList("segment_2", "segment_3"),
Arrays.asList("merged_1", "merged_2"),
LineageEntryState.IN_PROGRESS, currentTimeInMillis -
TimeUnit.DAYS.toMillis(2L));
@@ -168,15 +175,60 @@ public class SegmentLineageCleanupTest {
segmentsForTable =
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
false);
Assert.assertEquals(segmentsForTable.size(), 3);
Assert.assertTrue(Collections.disjoint(segmentsForTable,
Arrays.asList("merged_1", "merged_2")));
- segmentLineage =
- SegmentLineageAccessHelper
-
.getSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
OFFLINE_TABLE_NAME);
+ segmentLineage = SegmentLineageAccessHelper
+
.getSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
OFFLINE_TABLE_NAME);
Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
}
+ @Test
+ public void testRefreshTableCleanup()
+ throws InterruptedException {
+ // Create metadata for original segments
+ for (int i = 0; i < 3; i++) {
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(REFRESH_OFFLINE_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(REFRESH_OFFLINE_TABLE_NAME,
"segment1_" + i), "downloadUrl");
+ }
+
+ // Create metadata for new segments.
+ for (int i = 0; i < 3; i++) {
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(REFRESH_OFFLINE_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(REFRESH_OFFLINE_TABLE_NAME,
"segment2_" + i), "downloadUrl");
+ }
+
+ Assert.assertEquals(
+
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
false).size(), 6);
+
+ // Validate the case when the lineage entry state is 'IN_PROGRESS'
+ SegmentLineage segmentLineage = new
SegmentLineage(REFRESH_OFFLINE_TABLE_NAME);
+ segmentLineage.addLineageEntry("0", new
LineageEntry(Arrays.asList("segment1_0", "segment1_1", "segment1_2"),
+ Arrays.asList("segment2_0", "segment2_1", "segment2_2"),
LineageEntryState.IN_PROGRESS,
+ System.currentTimeMillis()));
+ SegmentLineageAccessHelper
+
.writeSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
segmentLineage, -1);
+ _retentionManager.processTable(REFRESH_OFFLINE_TABLE_NAME);
+ try {
+ waitForSegmentsToDelete(REFRESH_OFFLINE_TABLE_NAME, 3, 1000L);
+ Assert.fail();
+ } catch (Exception e) {
+ // expected since the original segments are not supposed to be
immediately erased by the retention manager.
+ }
+ List<String> segmentsForTable =
+
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
false);
+ Assert.assertEquals(segmentsForTable.size(), 6);
+
+ segmentsForTable =
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
true);
+ Assert.assertEquals(segmentsForTable.size(), 3);
+ }
+
private void waitForSegmentsToDelete(String tableNameWithType, int
expectedNumSegmentsAfterDelete)
throws InterruptedException {
- long endTimeMs = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+ waitForSegmentsToDelete(tableNameWithType, expectedNumSegmentsAfterDelete,
MAX_TIMEOUT_IN_MILLISECOND);
+ }
+
+ private void waitForSegmentsToDelete(String tableNameWithType, int
expectedNumSegmentsAfterDelete,
+ long timeOutInMillis)
+ throws InterruptedException {
+ long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
do {
if
(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(tableNameWithType,
false).size()
== expectedNumSegmentsAfterDelete) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]