This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch fix-remove-aged-deleted-segments in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 187d0a669f1eb58e4c730f1be9d03491bb767987 Author: Jack Li(Analytics Engineering) <[email protected]> AuthorDate: Mon Feb 6 20:24:22 2023 -0800 Fix race condition on removing aged deleted segments --- .../org/apache/pinot/common/utils/URIUtils.java | 8 +++++ .../apache/pinot/common/utils/URIUtilsTest.java | 7 ++++ .../helix/core/SegmentDeletionManager.java | 40 ++++++++++++---------- .../helix/core/retention/RetentionManager.java | 2 +- .../helix/core/retention/RetentionManagerTest.java | 6 ++-- .../core/util/SegmentDeletionManagerTest.java | 9 ++--- 6 files changed, 46 insertions(+), 26 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java index 042427b772..56902f8bf5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java @@ -70,6 +70,14 @@ public class URIUtils { return stringJoiner.toString(); } + /** + * Returns the last part for the given path split by the file separator. + * If the file separator is not found, returns the whole path as the last part. + */ + public static String getLastPart(String path) { + return path.substring(path.lastIndexOf(File.separator) + 1); + } + /** * Returns the download URL with the segment name encoded. */ diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java index 4cec446067..b498ce9b0f 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java @@ -52,6 +52,13 @@ public class URIUtilsTest { assertEquals(URIUtils.getPath("file:/foo/bar", "table", "segment+%25"), "file:/foo/bar/table/segment+%25"); } + @Test + public void testGetLastPart() { + assertEquals(URIUtils.getLastPart("http://foo/bar"), "bar"); + assertEquals(URIUtils.getLastPart("/foo/bar"), "bar"); + assertEquals(URIUtils.getLastPart("file:/foo/bar"), "bar"); + } + @Test public void testConstructDownloadUrl() { assertEquals(URIUtils.constructDownloadUrl("http://foo/bar", "table", "segment"), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index ff74bea708..ecc7d337e8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -44,6 +44,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.SegmentName; import org.apache.pinot.common.utils.URIUtils; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.filesystem.PinotFS; @@ -264,7 +265,7 @@ public class SegmentDeletionManager { /** * Removes aged deleted segments from the deleted directory */ - public void removeAgedDeletedSegments() { + public void removeAgedDeletedSegments(LeadControllerManager leadControllerManager) { if (_dataDir != null) { URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS); PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme()); @@ -287,26 +288,29 @@ public class SegmentDeletionManager { } for (String tableNameDir : tableNameDirs) { - URI tableNameURI = URIUtils.getUri(tableNameDir); - // Get files that are aged - final String[] targetFiles = pinotFS.listFiles(tableNameURI, false); - int numFilesDeleted = 0; - for (String targetFile : targetFiles) { - URI targetURI = URIUtils.getUri(targetFile); - long deletionTimeMs = getDeletionTimeMsFromFile(targetFile, pinotFS.lastModified(targetURI)); - if (System.currentTimeMillis() >= deletionTimeMs) { - if (!pinotFS.delete(targetURI, true)) { - LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI.toString()); - } else { - numFilesDeleted++; + String tableName = URIUtils.getLastPart(tableNameDir); + if (leadControllerManager.isLeaderForTable(tableName)) { + URI tableNameURI = URIUtils.getUri(tableNameDir); + // Get files that are aged + final String[] targetFiles = pinotFS.listFiles(tableNameURI, false); + int numFilesDeleted = 0; + for (String targetFile : targetFiles) { + URI targetURI = URIUtils.getUri(targetFile); + long deletionTimeMs = getDeletionTimeMsFromFile(targetFile, pinotFS.lastModified(targetURI)); + if (System.currentTimeMillis() >= deletionTimeMs) { + if (!pinotFS.delete(targetURI, true)) { + LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI); + } else { + numFilesDeleted++; + } } } - } - if (numFilesDeleted == targetFiles.length) { - // Delete directory if it's empty - if (!pinotFS.delete(tableNameURI, false)) { - LOGGER.warn("The directory {} cannot be removed.", tableNameDir); + if (numFilesDeleted == targetFiles.length) { + // Delete directory if it's empty + if (!pinotFS.delete(tableNameURI, false)) { + LOGGER.warn("The directory {} cannot be removed.", tableNameDir); + } } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java index 624ee914d7..ea9cda626f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java @@ -94,7 +94,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { @Override protected void postprocess() { LOGGER.info("Removing aged deleted segments for all tables"); - _pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(); + _pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_leadControllerManager); } private void manageRetentionForTable(TableConfig tableConfig) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index 85aba90f36..fa6e9b0ed9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -98,7 +98,7 @@ public class RetentionManagerTest { SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager(); // Verify that the removeAgedDeletedSegments() method in deletion manager is actually called. - verify(deletionManager, times(1)).removeAgedDeletedSegments(); + verify(deletionManager, times(1)).removeAgedDeletedSegments(any()); // Verify that the deleteSegments method is actually called. verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList()); @@ -177,7 +177,7 @@ public class RetentionManagerTest { throws Throwable { return null; } - }).when(deletionManager).removeAgedDeletedSegments(); + }).when(deletionManager).removeAgedDeletedSegments(any()); when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager); // If and when PinotHelixResourceManager.deleteSegments() is invoked, make sure that the segments deleted @@ -229,7 +229,7 @@ public class RetentionManagerTest { SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager(); // Verify that the removeAgedDeletedSegments() method in deletion manager is actually called. - verify(deletionManager, times(1)).removeAgedDeletedSegments(); + verify(deletionManager, times(1)).removeAgedDeletedSegments(any()); // Verify that the deleteSegments method is actually called. verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java index a617dd8612..2dc4c4ac0e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java @@ -52,6 +52,7 @@ import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -230,7 +231,7 @@ public class SegmentDeletionManagerTest { tempDir.getAbsolutePath(), helixAdmin, propertyStore, 7); // Test delete when deleted segments directory does not exists - deletionManager.removeAgedDeletedSegments(); + deletionManager.removeAgedDeletedSegments(any()); // Create deleted directory String deletedDirectoryPath = tempDir + File.separator + "Deleted_Segments"; @@ -238,7 +239,7 @@ public class SegmentDeletionManagerTest { deletedDirectory.mkdir(); // Test delete when deleted segments directory is empty - deletionManager.removeAgedDeletedSegments(); + deletionManager.removeAgedDeletedSegments(any()); // Create dummy directories and files File dummyDir1 = new File(deletedDirectoryPath + File.separator + "dummy1"); @@ -249,7 +250,7 @@ public class SegmentDeletionManagerTest { dummyDir3.mkdir(); // Test delete when there is no files but some directories exist - deletionManager.removeAgedDeletedSegments(); + deletionManager.removeAgedDeletedSegments(any()); Assert.assertEquals(dummyDir1.exists(), false); Assert.assertEquals(dummyDir2.exists(), false); Assert.assertEquals(dummyDir3.exists(), false); @@ -279,7 +280,7 @@ public class SegmentDeletionManagerTest { Assert.assertEquals(dummyDir3.list().length, 3); // Try to remove files with the retention of 1 days. - deletionManager.removeAgedDeletedSegments(); + deletionManager.removeAgedDeletedSegments(any()); // Check that only 1 day retention file is remaining Assert.assertEquals(dummyDir1.list().length, 1); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
