This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d530695c57 Use ideal state as source of truth for segment existence
(#9735)
d530695c57 is described below
commit d530695c5759ae042896d40c989bed1b4ea872ec
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Nov 4 21:51:01 2022 -0700
Use ideal state as source of truth for segment existence (#9735)
---
.../api/resources/PinotSegmentRestletResource.java | 4 +-
.../helix/core/PinotHelixResourceManager.java | 56 ++++++++++------------
.../PinotHelixResourceManagerStatelessTest.java | 3 +-
3 files changed, 27 insertions(+), 36 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index ea77b9caec..c467c3f816 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -829,8 +829,8 @@ public class PinotSegmentRestletResource {
}
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOGGER).get(0);
- deleteSegmentsInternal(tableNameWithType,
_pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false),
- retentionPeriod);
+ deleteSegmentsInternal(tableNameWithType,
+
_pinotHelixResourceManager.getSegmentsFromPropertyStore(tableNameWithType),
retentionPeriod);
return new SuccessResponse("All segments of table " + tableNameWithType +
" deleted");
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 3e7ed24d5d..01b778a5f4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -730,22 +730,29 @@ public class PinotHelixResourceManager {
*/
/**
- * Returns the segments for the given table.
+ * Returns the segments for the given table from the ideal state.
*
* @param tableNameWithType Table name with type suffix
* @param shouldExcludeReplacedSegments whether to return the list of
segments that doesn't contain replaced segments.
* @return List of segment names
*/
public List<String> getSegmentsFor(String tableNameWithType, boolean
shouldExcludeReplacedSegments) {
- List<String> segmentsFromPropertiesStore =
ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
- if (shouldExcludeReplacedSegments) {
- return excludeReplacedSegments(tableNameWithType,
segmentsFromPropertiesStore);
- }
- return segmentsFromPropertiesStore;
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
+ List<String> segments = new ArrayList<>(idealState.getPartitionSet());
+ return shouldExcludeReplacedSegments ?
excludeReplacedSegments(tableNameWithType, segments) : segments;
}
/**
- * Returns the segments for the given table based on the start and end
timestamp.
+ * Returns the segments for the given table from the property store. This
API is useful to track the orphan segments
+ * that are removed from the ideal state but not the property store.
+ */
+ public List<String> getSegmentsFromPropertyStore(String tableNameWithType) {
+ return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
+ }
+
+ /**
+ * Returns the segments for the given table based on the start and end
timestamp from the ideal state.
*
* @param tableNameWithType Table name with type suffix
* @param startTimestamp start timestamp in milliseconds (inclusive)
@@ -754,21 +761,24 @@ public class PinotHelixResourceManager {
*/
public List<String> getSegmentsForTableWithTimestamps(String
tableNameWithType, long startTimestamp,
long endTimestamp, boolean excludeOverlapping) {
- List<String> selectedSegments;
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
+ Set<String> segments = idealState.getPartitionSet();
// If no start and end timestamp specified, just select all the segments.
if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
- selectedSegments = getSegmentsFor(tableNameWithType, false);
+ return excludeReplacedSegments(tableNameWithType, new
ArrayList<>(segments));
} else {
- selectedSegments = new ArrayList<>();
+ List<String> selectedSegments = new ArrayList<>();
List<SegmentZKMetadata> segmentZKMetadataList =
getSegmentsZKMetadata(tableNameWithType);
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
String segmentName = segmentZKMetadata.getSegmentName();
- if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp,
endTimestamp, excludeOverlapping)) {
+ if (segments.contains(segmentName) &&
isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp,
+ excludeOverlapping)) {
selectedSegments.add(segmentName);
}
}
+ return excludeReplacedSegments(tableNameWithType, selectedSegments);
}
- return excludeReplacedSegments(tableNameWithType, selectedSegments);
}
/**
@@ -1890,7 +1900,7 @@ public class PinotHelixResourceManager {
// Remove all stored segments for the table
Long retentionPeriodMs = retentionPeriod != null ?
TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
- _segmentDeletionManager.removeSegmentsFromStore(offlineTableName,
getSegmentsFor(offlineTableName, false),
+ _segmentDeletionManager.removeSegmentsFromStore(offlineTableName,
getSegmentsFromPropertyStore(offlineTableName),
retentionPeriodMs);
LOGGER.info("Deleting table {}: Removed stored segments",
offlineTableName);
@@ -1947,7 +1957,7 @@ public class PinotHelixResourceManager {
// Remove all stored segments for the table
Long retentionPeriodMs = retentionPeriod != null ?
TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
- _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName,
getSegmentsFor(realtimeTableName, false),
+ _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName,
getSegmentsFromPropertyStore(realtimeTableName),
retentionPeriodMs);
LOGGER.info("Deleting table {}: Removed stored segments",
realtimeTableName);
@@ -3334,7 +3344,6 @@ public class PinotHelixResourceManager {
if (!segmentsToCleanUp.isEmpty()) {
LOGGER.info("Cleaning up the segments while startReplaceSegments:
{}", segmentsToCleanUp);
deleteSegments(tableNameWithType, segmentsToCleanUp);
- waitForSegmentsToDelete(tableNameWithType, segmentsToCleanUp,
SEGMENT_CLEANUP_TIMEOUT_MS);
}
return true;
} else {
@@ -3355,23 +3364,6 @@ public class PinotHelixResourceManager {
return segmentLineageEntryId;
}
- private void waitForSegmentsToDelete(String tableNameWithType, List<String>
segments, long timeOutInMillis)
- throws InterruptedException {
- LOGGER.info("Waiting for {} segments to delete for table: {}. timeout =
{}ms, segments = {}", segments.size(),
- tableNameWithType, timeOutInMillis, segments);
- long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
- do {
- if (Collections.disjoint(getSegmentsFor(tableNameWithType, false),
segments)) {
- return;
- } else {
- Thread.sleep(SEGMENT_CLEANUP_CHECK_INTERVAL_MS);
- }
- } while (System.currentTimeMillis() < endTimeMs);
- throw new RuntimeException(
- "Timeout while waiting for segments to be deleted for table: " +
tableNameWithType + ", timeout: "
- + timeOutInMillis + "ms");
- }
-
/**
* Computes the end segment replace phase
*
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 5e9def8a56..6569a6a4be 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -1053,8 +1053,7 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Call revert segment replacements (s3, s4, s5) <- (s9, s10, s11) to
check if the revertReplaceSegments correctly
// deleted (s9, s10, s11).
_helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME,
lineageEntryId3, false);
- TestUtils.waitForCondition(aVoid ->
_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size() == 3,
- 60_000L, "Failed to delete the segments");
+ assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
false).size(), 3);
assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
true), "s3", "s4", "s5");
// Re-upload (s9, s10, s11) to test the segment clean up from
startReplaceSegments
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]