Jackie-Jiang commented on code in PR #10815:
URL: https://github.com/apache/pinot/pull/10815#discussion_r1299499174
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java:
##########
@@ -44,4 +47,8 @@ public static String getSegmentNamePrefix(String segmentName)
{
public static String generateSegmentFileName(String segmentNameStr) {
return getSegmentNamePrefix(segmentNameStr) + UUID.randomUUID().toString();
}
+
+ public static boolean isTmpFile(String uri) {
+ return TMP_FILE.matcher(uri).find();
Review Comment:
Based on how the name is generated, should we consider splitting the name
with `TMP` and then try to parse the second part as UUID?
```suggestion
String[] splits = StringUtils.splitByWholeSeparator(uri, TMP);
if (splits.length != 2) {
return false;
}
try {
UUID.fromString(splits[1]);
return true;
} catch (IllegalArgumentException e) {
return false;
}
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1465,6 +1469,65 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
+ public long deleteTmpSegments(String tableNameWithType) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ return 0L;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
deletion of tmp segments", tableNameWithType);
+ return 0L;
+ }
+
+ if (!isLowLevelConsumer(tableNameWithType, tableConfig)
+ || !getIsSplitCommitEnabled()
+ || !isTmpSegmentAsyncDeletionEnabled()) {
+ return 0L;
+ }
Review Comment:
These checks are redundant
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1465,6 +1469,65 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
+ public long deleteTmpSegments(String tableNameWithType) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ return 0L;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
deletion of tmp segments", tableNameWithType);
+ return 0L;
+ }
+
+ if (!isLowLevelConsumer(tableNameWithType, tableConfig)
+ || !getIsSplitCommitEnabled()
+ || !isTmpSegmentAsyncDeletionEnabled()) {
+ return 0L;
+ }
+
+ // Delete tmp segments for realtime table with low level consumer, split
commit and async deletion is enabled.
+ List<SegmentZKMetadata> segmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
+ Set<String> deepURIs = segmentsZKMetadata.stream().parallel().filter(meta
-> meta.getStatus() == Status.DONE
+ &&
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map(
+ SegmentZKMetadata::getDownloadUrl).collect(
+ Collectors.toSet());
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(),
rawTableName);
+ PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
+ long orphanTmpSegments = 0;
+ try {
+ for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
+ // prepend scheme
+ URI uri = URIUtils.getUri(filePath);
+ if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) {
+ LOGGER.info("Deleting temporary segment file: {}", uri);
+ Preconditions.checkState(pinotFS.delete(uri, true), "Failed to
delete file: %s", uri);
+ orphanTmpSegments++;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while deleting temporary files for table:
{}", rawTableName, e);
+ }
+ return orphanTmpSegments;
+ }
+
+ private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS
pinotFS) throws Exception {
+ long lastModified = pinotFS.lastModified(uri);
+ String uriString = uri.toString();
+ return SegmentCompletionUtils.isTmpFile(uriString) &&
!deepURIs.contains(uriString)
Review Comment:
To reduce unnecessary FS access, first check the name, then read the
modified time.
I don't think we need to check if it is used as download uri because it
should never happen
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1465,6 +1469,65 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
+ public long deleteTmpSegments(String tableNameWithType) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ return 0L;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
deletion of tmp segments", tableNameWithType);
+ return 0L;
+ }
+
+ if (!isLowLevelConsumer(tableNameWithType, tableConfig)
+ || !getIsSplitCommitEnabled()
+ || !isTmpSegmentAsyncDeletionEnabled()) {
+ return 0L;
+ }
+
+ // Delete tmp segments for realtime table with low level consumer, split
commit and async deletion is enabled.
+ List<SegmentZKMetadata> segmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
+ Set<String> deepURIs = segmentsZKMetadata.stream().parallel().filter(meta
-> meta.getStatus() == Status.DONE
+ &&
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map(
+ SegmentZKMetadata::getDownloadUrl).collect(
+ Collectors.toSet());
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(),
rawTableName);
+ PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
+ long orphanTmpSegments = 0;
+ try {
+ for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
+ // prepend scheme
+ URI uri = URIUtils.getUri(filePath);
+ if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) {
+ LOGGER.info("Deleting temporary segment file: {}", uri);
+ Preconditions.checkState(pinotFS.delete(uri, true), "Failed to
delete file: %s", uri);
+ orphanTmpSegments++;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while deleting temporary files for table:
{}", rawTableName, e);
+ }
+ return orphanTmpSegments;
+ }
+
+ private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS
pinotFS) throws Exception {
+ long lastModified = pinotFS.lastModified(uri);
+ String uriString = uri.toString();
Review Comment:
We are converting string to uri then back, can we directly pass in string?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1465,6 +1469,65 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
+ public long deleteTmpSegments(String tableNameWithType) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ return 0L;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
deletion of tmp segments", tableNameWithType);
+ return 0L;
+ }
+
+ if (!isLowLevelConsumer(tableNameWithType, tableConfig)
+ || !getIsSplitCommitEnabled()
+ || !isTmpSegmentAsyncDeletionEnabled()) {
+ return 0L;
+ }
+
+ // Delete tmp segments for realtime table with low level consumer, split
commit and async deletion is enabled.
+ List<SegmentZKMetadata> segmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
Review Comment:
We should avoid reading all segments' ZK metadata because it is very
expensive
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]