This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 287f552  add retention period to deleted segment files and allow table 
level o… (#8176)
287f552 is described below

commit 287f55212d012b18803bfb365ba7d4ee501f6acb
Author: Rong Rong <[email protected]>
AuthorDate: Fri Feb 18 11:38:00 2022 -0800

    add retention period to deleted segment files and allow table level o… 
(#8176)
    
    Add support to control segment retention period on a per-table basis.
---
 .../helix/core/PinotHelixResourceManager.java      |   3 +-
 .../helix/core/SegmentDeletionManager.java         | 106 ++++++++++++---
 .../helix/core/retention/RetentionManager.java     |  10 +-
 .../helix/core/retention/RetentionManagerTest.java |   7 +-
 .../core/util/SegmentDeletionManagerTest.java      | 144 ++++++++++++++++++---
 .../SegmentsValidationAndRetentionConfig.java      |   9 ++
 .../spi/utils/builder/TableConfigBuilder.java      |   3 +
 7 files changed, 229 insertions(+), 53 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 38d47d5..df64e2d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -707,7 +707,8 @@ public class PinotHelixResourceManager {
       
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
           "Table name: %s is not a valid table name with type suffix", 
tableNameWithType);
       HelixHelper.removeSegmentsFromIdealState(_helixZkManager, 
tableNameWithType, segmentNames);
-      _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames);
+      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+      _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, 
tableConfig);
       return PinotResourceManagerResponse.success("Segment " + segmentNames + 
" deleted");
     } catch (final Exception e) {
       LOGGER.error("Caught exception while deleting segment: {} from table: 
{}", segmentNames, tableNameWithType, e);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 6407568..35de4e7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core;
 
 import java.io.IOException;
 import java.net.URI;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -27,10 +28,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.ZNRecord;
@@ -40,10 +44,12 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,14 +59,26 @@ public class SegmentDeletionManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentDeletionManager.class);
   private static final long MAX_DELETION_DELAY_SECONDS = 300L;  // Maximum of 
5 minutes back-off to retry the deletion
   private static final long DEFAULT_DELETION_DELAY_SECONDS = 2L;
+
+  // Retention date format will be written as suffix to deleted segments under 
`Deleted_Segments` folder. for example:
+  // 
`Deleted_Segments/myTable/myTable_mySegment_0__RETENTION_UNTIL__202202021200` 
to indicate that this segment
+  // file will be permanently deleted after Feb 2nd 2022 12PM.
   private static final String DELETED_SEGMENTS = "Deleted_Segments";
+  private static final String RETENTION_UNTIL_SEPARATOR = 
"__RETENTION_UNTIL__";
+  private static final String RETENTION_DATE_FORMAT_STR = "yyyyMMddHHmm";
+  private static final SimpleDateFormat RETENTION_DATE_FORMAT;
+
+  static {
+    RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR);
+    RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
 
   private final ScheduledExecutorService _executorService;
   private final String _dataDir;
   private final String _helixClusterName;
   private final HelixAdmin _helixAdmin;
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final int _defaultDeletedSegmentsRetentionInDays;
+  private final long _defaultDeletedSegmentsRetentionMs;
 
   public SegmentDeletionManager(String dataDir, HelixAdmin helixAdmin, String 
helixClusterName,
       ZkHelixPropertyStore<ZNRecord> propertyStore, int 
deletedSegmentsRetentionInDays) {
@@ -68,7 +86,7 @@ public class SegmentDeletionManager {
     _helixAdmin = helixAdmin;
     _helixClusterName = helixClusterName;
     _propertyStore = propertyStore;
-    _defaultDeletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
+    _defaultDeletedSegmentsRetentionMs = 
TimeUnit.DAYS.toMillis(deletedSegmentsRetentionInDays);
 
     _executorService = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactory() {
       @Override
@@ -84,22 +102,29 @@ public class SegmentDeletionManager {
     _executorService.shutdownNow();
   }
 
-  public void deleteSegments(final String tableName, final Collection<String> 
segmentIds) {
-    deleteSegmentsWithDelay(tableName, segmentIds, 
DEFAULT_DELETION_DELAY_SECONDS);
+  public void deleteSegments(String tableName, Collection<String> segmentIds) {
+    deleteSegments(tableName, segmentIds, null);
+  }
+
+  public void deleteSegments(String tableName, Collection<String> segmentIds,
+      @Nullable TableConfig tableConfig) {
+    Long deletedSegmentsRetentionMs = 
getRetentionMsFromTableConfig(tableConfig);
+    deleteSegmentsWithDelay(tableName, segmentIds, deletedSegmentsRetentionMs, 
DEFAULT_DELETION_DELAY_SECONDS);
   }
 
-  protected void deleteSegmentsWithDelay(final String tableName, final 
Collection<String> segmentIds,
-      final long deletionDelaySeconds) {
+  protected void deleteSegmentsWithDelay(String tableName, Collection<String> 
segmentIds,
+      Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
     _executorService.schedule(new Runnable() {
       @Override
       public void run() {
-        deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, 
deletionDelaySeconds);
+        deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, 
deletedSegmentsRetentionMs,
+            deletionDelaySeconds);
       }
     }, deletionDelaySeconds, TimeUnit.SECONDS);
   }
 
   protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String 
tableName, Collection<String> segmentIds,
-      long deletionDelay) {
+      Long deletedSegmentsRetentionMs, long deletionDelay) {
     // Check if segment got removed from ExternalView or IdealState
     ExternalView externalView = 
_helixAdmin.getResourceExternalView(_helixClusterName, tableName);
     IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableName);
@@ -151,7 +176,7 @@ public class SegmentDeletionManager {
       }
       segmentsToDelete.removeAll(propStoreFailedSegs);
 
-      removeSegmentsFromStore(tableName, segmentsToDelete);
+      removeSegmentsFromStore(tableName, segmentsToDelete, 
deletedSegmentsRetentionMs);
     }
 
     LOGGER.info("Deleted {} segments from table {}:{}", 
segmentsToDelete.size(), tableName,
@@ -160,27 +185,35 @@ public class SegmentDeletionManager {
     if (!segmentsToRetryLater.isEmpty()) {
       long effectiveDeletionDelay = Math.min(deletionDelay * 2, 
MAX_DELETION_DELAY_SECONDS);
       LOGGER.info("Postponing deletion of {} segments from table {}", 
segmentsToRetryLater.size(), tableName);
-      deleteSegmentsWithDelay(tableName, segmentsToRetryLater, 
effectiveDeletionDelay);
+      deleteSegmentsWithDelay(tableName, segmentsToRetryLater, 
deletedSegmentsRetentionMs, effectiveDeletionDelay);
       return;
     }
   }
 
   public void removeSegmentsFromStore(String tableNameWithType, List<String> 
segments) {
+    removeSegmentsFromStore(tableNameWithType, segments, null);
+  }
+
+  public void removeSegmentsFromStore(String tableNameWithType, List<String> 
segments,
+      @Nullable Long deletedSegmentsRetentionMs) {
     for (String segment : segments) {
-      removeSegmentFromStore(tableNameWithType, segment);
+      removeSegmentFromStore(tableNameWithType, segment, 
deletedSegmentsRetentionMs);
     }
   }
 
-  protected void removeSegmentFromStore(String tableNameWithType, String 
segmentId) {
+  protected void removeSegmentFromStore(String tableNameWithType, String 
segmentId,
+      @Nullable Long deletedSegmentsRetentionMs) {
     // Ignore HLC segments as they are not stored in Pinot FS
     if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
       return;
     }
     if (_dataDir != null) {
+      long retentionMs = deletedSegmentsRetentionMs == null
+          ? _defaultDeletedSegmentsRetentionMs : deletedSegmentsRetentionMs;
       String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
       URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, 
URIUtils.encode(segmentId));
       PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme());
-      if (_defaultDeletedSegmentsRetentionInDays <= 0) {
+      if (retentionMs <= 0) {
         // delete the segment file instantly if retention is set to zero
         try {
           if (pinotFS.delete(fileToDeleteURI, true)) {
@@ -193,8 +226,9 @@ public class SegmentDeletionManager {
         }
       } else {
         // move the segment file to deleted segments first and let retention 
manager handler the deletion
-        URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, 
DELETED_SEGMENTS, rawTableName,
-            URIUtils.encode(segmentId));
+        String deletedFileName = deletedSegmentsRetentionMs == null ? 
URIUtils.encode(segmentId)
+            : getDeletedSegmentFileName(URIUtils.encode(segmentId), 
deletedSegmentsRetentionMs);
+        URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, 
DELETED_SEGMENTS, rawTableName, deletedFileName);
         try {
           if (pinotFS.exists(fileToDeleteURI)) {
             // Overwrites the file if it already exists in the target 
directory.
@@ -223,9 +257,8 @@ public class SegmentDeletionManager {
 
   /**
    * Removes aged deleted segments from the deleted directory
-   * @param retentionInDays: retention for deleted segments in days
    */
-  public void removeAgedDeletedSegments(int retentionInDays) {
+  public void removeAgedDeletedSegments() {
     if (_dataDir != null) {
       URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
       PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
@@ -254,8 +287,8 @@ public class SegmentDeletionManager {
           int numFilesDeleted = 0;
           for (String targetFile : targetFiles) {
             URI targetURI = URIUtils.getUri(targetFile);
-            Date dateToDelete = 
DateTime.now().minusDays(retentionInDays).toDate();
-            if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) {
+            long deletionTimeMs = getDeletionTimeMsFromFile(targetFile, 
pinotFS.lastModified(targetURI));
+            if (System.currentTimeMillis() >= deletionTimeMs) {
               if (!pinotFS.delete(targetURI, true)) {
                 LOGGER.warn("Cannot remove file {} from deleted directory.", 
targetURI.toString());
               } else {
@@ -278,4 +311,37 @@ public class SegmentDeletionManager {
       LOGGER.info("dataDir is not configured, won't delete any expired 
segments from deleted directory.");
     }
   }
+
+  private String getDeletedSegmentFileName(String fileName, long 
deletedSegmentsRetentionMs) {
+    return fileName + RETENTION_UNTIL_SEPARATOR + 
RETENTION_DATE_FORMAT.format(new Date(
+        System.currentTimeMillis() + deletedSegmentsRetentionMs));
+  }
+
+  private long getDeletionTimeMsFromFile(String targetFile, long 
lastModifiedTime) {
+    String[] split = StringUtils.splitByWholeSeparator(targetFile, 
RETENTION_UNTIL_SEPARATOR);
+    if (split.length == 2) {
+      try {
+        return RETENTION_DATE_FORMAT.parse(split[1]).getTime();
+      } catch (Exception e) {
+        LOGGER.warn("No retention suffix found for file: {}", targetFile);
+      }
+    }
+    LOGGER.info("Fallback to using default cluster retention config: {} ms", 
_defaultDeletedSegmentsRetentionMs);
+    return lastModifiedTime + _defaultDeletedSegmentsRetentionMs;
+  }
+
+  @Nullable
+  private static Long getRetentionMsFromTableConfig(@Nullable TableConfig 
tableConfig) {
+    if (tableConfig != null) {
+      SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
+      if 
(!StringUtils.isEmpty(validationConfig.getDeletedSegmentsRetentionPeriod())) {
+        try {
+          return 
TimeUtils.convertPeriodToMillis(validationConfig.getDeletedSegmentsRetentionPeriod());
+        } catch (Exception e) {
+          LOGGER.warn("Unable to parse deleted segment retention config for 
table {}", tableConfig.getTableName(), e);
+        }
+      }
+    }
+    return null;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index d66d02d..bf24f78 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -64,17 +64,13 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RetentionManager.class);
 
-  private final int _deletedSegmentsRetentionInDays;
-
   public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics) {
     super("RetentionManager", 
config.getRetentionControllerFrequencyInSeconds(),
         config.getRetentionManagerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
         controllerMetrics);
-    _deletedSegmentsRetentionInDays = 
config.getDeletedSegmentsRetentionInDays();
 
-    LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, 
deletedSegmentsRetentionInDays: {}",
-        getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
+    LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", 
getIntervalInSeconds());
   }
 
   @Override
@@ -97,8 +93,8 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
 
   @Override
   protected void postprocess() {
-    LOGGER.info("Removing aged (more than {} days) deleted segments for all 
tables", _deletedSegmentsRetentionInDays);
-    
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+    LOGGER.info("Removing aged deleted segments for all tables");
+    
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments();
   }
 
   private void manageRetentionForTable(TableConfig tableConfig) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index ef69d84..ce90263 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -49,7 +49,6 @@ import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.*;
 
@@ -103,7 +102,7 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
 
     // Verify that the removeAgedDeletedSegments() method in deletion manager 
is actually called.
-    verify(deletionManager, times(1)).removeAgedDeletedSegments(anyInt());
+    verify(deletionManager, times(1)).removeAgedDeletedSegments();
 
     // Verify that the deleteSegments method is actually called.
     verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), 
anyList());
@@ -177,7 +176,7 @@ public class RetentionManagerTest {
           throws Throwable {
         return null;
       }
-    }).when(deletionManager).removeAgedDeletedSegments(anyInt());
+    }).when(deletionManager).removeAgedDeletedSegments();
     
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
 
     // If and when PinotHelixResourceManager.deleteSegments() is invoked, make 
sure that the segments deleted
@@ -229,7 +228,7 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
 
     // Verify that the removeAgedDeletedSegments() method in deletion manager 
is actually called.
-    verify(deletionManager, times(1)).removeAgedDeletedSegments(anyInt());
+    verify(deletionManager, times(1)).removeAgedDeletedSegments();
 
     // Verify that the deleteSegments method is actually called.
     verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), 
anyList());
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index 0984a14..8d56e7c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -20,23 +20,32 @@ package org.apache.pinot.controller.helix.core.util;
 
 import com.google.common.io.Files;
 import java.io.File;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
 import org.joda.time.DateTime;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -51,8 +60,17 @@ import static org.mockito.Mockito.when;
 
 
 public class SegmentDeletionManagerTest {
-  private final static String TABLE_NAME = "table";
-  private final static String CLUSTER_NAME = "mock";
+  private static final String TABLE_NAME = "table";
+  private static final String CLUSTER_NAME = "mock";
+  // these prefix must be the same as those in SegmentDeletionManager.
+  private static final String RETENTION_UNTIL_SEPARATOR = 
"__RETENTION_UNTIL__";
+  private static final String RETENTION_DATE_FORMAT_STR = "yyyyMMddHHmm";
+  private static final SimpleDateFormat RETENTION_DATE_FORMAT;
+
+  static {
+    RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR);
+    RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
 
   HelixAdmin makeHelixAdmin() {
     HelixAdmin admin = mock(HelixAdmin.class);
@@ -208,10 +226,11 @@ public class SegmentDeletionManagerTest {
     ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
     File tempDir = Files.createTempDir();
     tempDir.deleteOnExit();
-    FakeDeletionManager deletionManager = new 
FakeDeletionManager(tempDir.getAbsolutePath(), helixAdmin, propertyStore);
+    FakeDeletionManager deletionManager = new FakeDeletionManager(
+        tempDir.getAbsolutePath(), helixAdmin, propertyStore, 7);
 
     // Test delete when deleted segments directory does not exists
-    deletionManager.removeAgedDeletedSegments(1);
+    deletionManager.removeAgedDeletedSegments();
 
     // Create deleted directory
     String deletedDirectoryPath = tempDir + File.separator + 
"Deleted_Segments";
@@ -219,29 +238,36 @@ public class SegmentDeletionManagerTest {
     deletedDirectory.mkdir();
 
     // Test delete when deleted segments directory is empty
-    deletionManager.removeAgedDeletedSegments(1);
+    deletionManager.removeAgedDeletedSegments();
 
     // Create dummy directories and files
     File dummyDir1 = new File(deletedDirectoryPath + File.separator + 
"dummy1");
     dummyDir1.mkdir();
     File dummyDir2 = new File(deletedDirectoryPath + File.separator + 
"dummy2");
     dummyDir2.mkdir();
+    File dummyDir3 = new File(deletedDirectoryPath + File.separator + 
"dummy3");
+    dummyDir3.mkdir();
 
     // Test delete when there is no files but some directories exist
-    deletionManager.removeAgedDeletedSegments(1);
+    deletionManager.removeAgedDeletedSegments();
     Assert.assertEquals(dummyDir1.exists(), false);
     Assert.assertEquals(dummyDir2.exists(), false);
+    Assert.assertEquals(dummyDir3.exists(), false);
 
-    // Create dummy directories and files
+    // Create dummy directories and files again
     dummyDir1.mkdir();
     dummyDir2.mkdir();
+    dummyDir3.mkdir();
 
     // Create dummy files
     for (int i = 0; i < 3; i++) {
-      createTestFileWithAge(dummyDir1.getAbsolutePath() + File.separator + 
"file" + i, i);
+      createTestFileWithAge(dummyDir1.getAbsolutePath() + File.separator + 
genDeletedSegmentName("file" + i, i, 1), i);
     }
     for (int i = 2; i < 5; i++) {
-      createTestFileWithAge(dummyDir2.getAbsolutePath() + File.separator + 
"file" + i, i);
+      createTestFileWithAge(dummyDir2.getAbsolutePath() + File.separator + 
genDeletedSegmentName("file" + i, i, 1), i);
+    }
+    for (int i = 6; i < 9; i++) {
+      createTestFileWithAge(dummyDir3.getAbsolutePath() + File.separator + 
"file" + i, i);
     }
 
     // Sleep 1 second to ensure the clock moves.
@@ -250,18 +276,92 @@ public class SegmentDeletionManagerTest {
     // Check that dummy directories and files are successfully created.
     Assert.assertEquals(dummyDir1.list().length, 3);
     Assert.assertEquals(dummyDir2.list().length, 3);
+    Assert.assertEquals(dummyDir3.list().length, 3);
 
-    // Try to remove files with the retention of 3 days.
-    deletionManager.removeAgedDeletedSegments(3);
-    Assert.assertEquals(dummyDir1.list().length, 3);
-    Assert.assertEquals(dummyDir2.list().length, 1);
+    // Try to remove files with the retention of 1 days.
+    deletionManager.removeAgedDeletedSegments();
 
-    // Try to further remove files with the retention of 1 days.
-    deletionManager.removeAgedDeletedSegments(1);
+    // Check that only 1 day retention file is remaining
     Assert.assertEquals(dummyDir1.list().length, 1);
 
     // Check that empty directory has successfully been removed.
     Assert.assertEquals(dummyDir2.exists(), false);
+
+    // Check that deleted file without retention suffix is honoring 
cluster-wide retention period of 7 days.
+    Assert.assertEquals(dummyDir3.list().length, 1);
+  }
+
+  @Test
+  public void testSegmentDeletionLogic()
+      throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+    
properties.put(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY 
+ ".class",
+        LocalPinotFS.class.getName());
+    PinotFSFactory.init(new PinotConfiguration(properties));
+
+    HelixAdmin helixAdmin = makeHelixAdmin();
+    ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
+    File tempDir = Files.createTempDir();
+    tempDir.deleteOnExit();
+    SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+        tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
+
+    // create table segment files.
+    Set<String> segments = new HashSet<>(segmentsThatShouldBeDeleted());
+    createTableAndSegmentFiles(tempDir, segmentsThatShouldBeDeleted());
+    final File tableDir = new File(tempDir.getAbsolutePath() + File.separator 
+ TABLE_NAME);
+    final File deletedTableDir = new File(tempDir.getAbsolutePath() + 
File.separator + "Deleted_Segments"
+        + File.separator + TABLE_NAME);
+
+    // delete the segments instantly.
+    SegmentsValidationAndRetentionConfig mockValidationConfig = 
mock(SegmentsValidationAndRetentionConfig.class);
+    
when(mockValidationConfig.getDeletedSegmentsRetentionPeriod()).thenReturn("0d");
+    TableConfig mockTableConfig = mock(TableConfig.class);
+    
when(mockTableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+    deletionManager.deleteSegments(TABLE_NAME, segments, mockTableConfig);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        Assert.assertEquals(tableDir.listFiles().length, 0);
+        Assert.assertTrue(!deletedTableDir.exists() || 
deletedTableDir.listFiles().length == 0);
+        return true;
+      } catch (Throwable t) {
+        return false;
+      }
+    }, 2000L, 10_000L, "Unable to verify table deletion with retention");
+
+    // create table segment files again to test default retention.
+    createTableAndSegmentFiles(tempDir, segmentsThatShouldBeDeleted());
+    // delete the segments with default retention
+    deletionManager.deleteSegments(TABLE_NAME, segments);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        Assert.assertEquals(tableDir.listFiles().length, 0);
+        Assert.assertEquals(deletedTableDir.listFiles().length, 
segments.size());
+        return true;
+      } catch (Throwable t) {
+        return false;
+      }
+    }, 2000L, 10_000L, "Unable to verify table deletion with retention");
+  }
+
+  public void createTableAndSegmentFiles(File tempDir, List<String> segmentIds)
+      throws Exception {
+    File tableDir = new File(tempDir.getAbsolutePath() + File.separator + 
TABLE_NAME);
+    tableDir.mkdir();
+    for (String segmentId : segmentIds) {
+      createTestFileWithAge(tableDir.getAbsolutePath() + File.separator + 
segmentId, 0);
+    }
+  }
+
+  public String genDeletedSegmentName(String fileName, int age, int 
retentionInDays) {
+    // adding one more hours to the deletion time just to make sure the test 
goes pass the retention period because
+    // we no longer keep second level info in the date format.
+    return StringUtils.join(fileName, RETENTION_UNTIL_SEPARATOR, 
RETENTION_DATE_FORMAT.format(new Date(
+        DateTime.now().minusDays(age).getMillis()
+            + TimeUnit.DAYS.toMillis(retentionInDays)
+            - TimeUnit.HOURS.toMillis(1))));
   }
 
   public void createTestFileWithAge(String path, int age)
@@ -280,22 +380,24 @@ public class SegmentDeletionManagerTest {
       super(null, helixAdmin, CLUSTER_NAME, propertyStore, 0);
     }
 
-    FakeDeletionManager(String localDiskDir, HelixAdmin helixAdmin, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
-      super(localDiskDir, helixAdmin, CLUSTER_NAME, propertyStore, 0);
+    FakeDeletionManager(String localDiskDir, HelixAdmin helixAdmin, 
ZkHelixPropertyStore<ZNRecord> propertyStore, int
+        deletedSegmentsRetentionInDays) {
+      super(localDiskDir, helixAdmin, CLUSTER_NAME, propertyStore, 
deletedSegmentsRetentionInDays);
     }
 
     public void deleteSegmentsFromPropertyStoreAndLocal(String tableName, 
Collection<String> segments) {
-      super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, 0L);
+      super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, 0L, 
0L);
     }
 
     @Override
-    protected void removeSegmentFromStore(String tableName, String segmentId) {
+    protected void removeSegmentFromStore(String tableName, String segmentId,
+        @Nullable Long deletedSegmentsRetentionMs) {
       _segmentsRemovedFromStore.add(segmentId);
     }
 
     @Override
-    protected void deleteSegmentsWithDelay(final String tableName, final 
Collection<String> segmentIds,
-        final long deletionDelaySeconds) {
+    protected void deleteSegmentsWithDelay(String tableName, 
Collection<String> segmentIds,
+        @Nullable Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
       _segmentsToRetry.addAll(segmentIds);
     }
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index befc3e2..b57cef1 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -29,6 +29,7 @@ import org.apache.pinot.spi.utils.TimeUtils;
 public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig {
   private String _retentionTimeUnit;
   private String _retentionTimeValue;
+  private String _deletedSegmentsRetentionPeriod;
   @Deprecated
   private String _segmentPushFrequency; // DO NOT REMOVE, this is used in 
internal segment generation management
   @Deprecated
@@ -102,6 +103,14 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
     _retentionTimeValue = retentionTimeValue;
   }
 
+  public String getDeletedSegmentsRetentionPeriod() {
+    return _deletedSegmentsRetentionPeriod;
+  }
+
+  public void setDeletedSegmentsRetentionPeriod(String 
deletedSegmentsRetentionPeriod) {
+    _deletedSegmentsRetentionPeriod = deletedSegmentsRetentionPeriod;
+  }
+
   /**
    * @deprecated Use {@code segmentIngestionFrequency} from {@link 
IngestionConfig#getBatchIngestionConfig()}
    */
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index ba8a07b..8871c6e 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -50,6 +50,7 @@ public class TableConfigBuilder {
   private static final String DEFAULT_SEGMENT_PUSH_TYPE = "APPEND";
   private static final String REFRESH_SEGMENT_PUSH_TYPE = "REFRESH";
   private static final String DEFAULT_SEGMENT_ASSIGNMENT_STRATEGY = 
"BalanceNumSegmentAssignmentStrategy";
+  private static final String DEFAULT_DELETED_SEGMENTS_RETENTION_PERIOD = "7d";
   private static final String DEFAULT_NUM_REPLICAS = "1";
   private static final String DEFAULT_LOAD_MODE = "HEAP";
   private static final String MMAP_LOAD_MODE = "MMAP";
@@ -67,6 +68,7 @@ public class TableConfigBuilder {
   private boolean _allowNullTimeValue;
   private String _retentionTimeUnit;
   private String _retentionTimeValue;
+  private String _deletedSegmentsRetentionPeriod = 
DEFAULT_DELETED_SEGMENTS_RETENTION_PERIOD;
   @Deprecated
   private String _segmentPushFrequency;
   @Deprecated
@@ -373,6 +375,7 @@ public class TableConfigBuilder {
     validationConfig.setAllowNullTimeValue(_allowNullTimeValue);
     validationConfig.setRetentionTimeUnit(_retentionTimeUnit);
     validationConfig.setRetentionTimeValue(_retentionTimeValue);
+    
validationConfig.setDeletedSegmentsRetentionPeriod(_deletedSegmentsRetentionPeriod);
     validationConfig.setSegmentPushFrequency(_segmentPushFrequency);
     validationConfig.setSegmentPushType(_segmentPushType);
     validationConfig.setSegmentAssignmentStrategy(_segmentAssignmentStrategy);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to