This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 287f552 add retention period to deleted segment files and allow table
level o… (#8176)
287f552 is described below
commit 287f55212d012b18803bfb365ba7d4ee501f6acb
Author: Rong Rong <[email protected]>
AuthorDate: Fri Feb 18 11:38:00 2022 -0800
add retention period to deleted segment files and allow table level o…
(#8176)
Add support to control segment retention period on a per-table basis.
---
.../helix/core/PinotHelixResourceManager.java | 3 +-
.../helix/core/SegmentDeletionManager.java | 106 ++++++++++++---
.../helix/core/retention/RetentionManager.java | 10 +-
.../helix/core/retention/RetentionManagerTest.java | 7 +-
.../core/util/SegmentDeletionManagerTest.java | 144 ++++++++++++++++++---
.../SegmentsValidationAndRetentionConfig.java | 9 ++
.../spi/utils/builder/TableConfigBuilder.java | 3 +
7 files changed, 229 insertions(+), 53 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 38d47d5..df64e2d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -707,7 +707,8 @@ public class PinotHelixResourceManager {
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix",
tableNameWithType);
HelixHelper.removeSegmentsFromIdealState(_helixZkManager,
tableNameWithType, segmentNames);
- _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames);
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames,
tableConfig);
return PinotResourceManagerResponse.success("Segment " + segmentNames +
" deleted");
} catch (final Exception e) {
LOGGER.error("Caught exception while deleting segment: {} from table:
{}", segmentNames, tableNameWithType, e);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 6407568..35de4e7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core;
import java.io.IOException;
import java.net.URI;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@@ -27,10 +28,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.ZNRecord;
@@ -40,10 +44,12 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,14 +59,26 @@ public class SegmentDeletionManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentDeletionManager.class);
private static final long MAX_DELETION_DELAY_SECONDS = 300L; // Maximum of
5 minutes back-off to retry the deletion
private static final long DEFAULT_DELETION_DELAY_SECONDS = 2L;
+
+ // Retention date format will be written as suffix to deleted segments under
`Deleted_Segments` folder. for example:
+ //
`Deleted_Segments/myTable/myTable_mySegment_0__RETENTION_UNTIL__202202021200`
to indicate that this segment
+ // file will be permanently deleted after Feb 2nd 2022 12PM.
private static final String DELETED_SEGMENTS = "Deleted_Segments";
+ private static final String RETENTION_UNTIL_SEPARATOR =
"__RETENTION_UNTIL__";
+ private static final String RETENTION_DATE_FORMAT_STR = "yyyyMMddHHmm";
+ private static final SimpleDateFormat RETENTION_DATE_FORMAT;
+
+ static {
+ RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR);
+ RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
private final ScheduledExecutorService _executorService;
private final String _dataDir;
private final String _helixClusterName;
private final HelixAdmin _helixAdmin;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private final int _defaultDeletedSegmentsRetentionInDays;
+ private final long _defaultDeletedSegmentsRetentionMs;
public SegmentDeletionManager(String dataDir, HelixAdmin helixAdmin, String
helixClusterName,
ZkHelixPropertyStore<ZNRecord> propertyStore, int
deletedSegmentsRetentionInDays) {
@@ -68,7 +86,7 @@ public class SegmentDeletionManager {
_helixAdmin = helixAdmin;
_helixClusterName = helixClusterName;
_propertyStore = propertyStore;
- _defaultDeletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
+ _defaultDeletedSegmentsRetentionMs =
TimeUnit.DAYS.toMillis(deletedSegmentsRetentionInDays);
_executorService = Executors.newSingleThreadScheduledExecutor(new
ThreadFactory() {
@Override
@@ -84,22 +102,29 @@ public class SegmentDeletionManager {
_executorService.shutdownNow();
}
- public void deleteSegments(final String tableName, final Collection<String>
segmentIds) {
- deleteSegmentsWithDelay(tableName, segmentIds,
DEFAULT_DELETION_DELAY_SECONDS);
+ public void deleteSegments(String tableName, Collection<String> segmentIds) {
+ deleteSegments(tableName, segmentIds, null);
+ }
+
+ public void deleteSegments(String tableName, Collection<String> segmentIds,
+ @Nullable TableConfig tableConfig) {
+ Long deletedSegmentsRetentionMs =
getRetentionMsFromTableConfig(tableConfig);
+ deleteSegmentsWithDelay(tableName, segmentIds, deletedSegmentsRetentionMs,
DEFAULT_DELETION_DELAY_SECONDS);
}
- protected void deleteSegmentsWithDelay(final String tableName, final
Collection<String> segmentIds,
- final long deletionDelaySeconds) {
+ protected void deleteSegmentsWithDelay(String tableName, Collection<String>
segmentIds,
+ Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
_executorService.schedule(new Runnable() {
@Override
public void run() {
- deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds,
deletionDelaySeconds);
+ deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds,
deletedSegmentsRetentionMs,
+ deletionDelaySeconds);
}
}, deletionDelaySeconds, TimeUnit.SECONDS);
}
protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String
tableName, Collection<String> segmentIds,
- long deletionDelay) {
+ Long deletedSegmentsRetentionMs, long deletionDelay) {
// Check if segment got removed from ExternalView or IdealState
ExternalView externalView =
_helixAdmin.getResourceExternalView(_helixClusterName, tableName);
IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableName);
@@ -151,7 +176,7 @@ public class SegmentDeletionManager {
}
segmentsToDelete.removeAll(propStoreFailedSegs);
- removeSegmentsFromStore(tableName, segmentsToDelete);
+ removeSegmentsFromStore(tableName, segmentsToDelete,
deletedSegmentsRetentionMs);
}
LOGGER.info("Deleted {} segments from table {}:{}",
segmentsToDelete.size(), tableName,
@@ -160,27 +185,35 @@ public class SegmentDeletionManager {
if (!segmentsToRetryLater.isEmpty()) {
long effectiveDeletionDelay = Math.min(deletionDelay * 2,
MAX_DELETION_DELAY_SECONDS);
LOGGER.info("Postponing deletion of {} segments from table {}",
segmentsToRetryLater.size(), tableName);
- deleteSegmentsWithDelay(tableName, segmentsToRetryLater,
effectiveDeletionDelay);
+ deleteSegmentsWithDelay(tableName, segmentsToRetryLater,
deletedSegmentsRetentionMs, effectiveDeletionDelay);
return;
}
}
public void removeSegmentsFromStore(String tableNameWithType, List<String>
segments) {
+ removeSegmentsFromStore(tableNameWithType, segments, null);
+ }
+
+ public void removeSegmentsFromStore(String tableNameWithType, List<String>
segments,
+ @Nullable Long deletedSegmentsRetentionMs) {
for (String segment : segments) {
- removeSegmentFromStore(tableNameWithType, segment);
+ removeSegmentFromStore(tableNameWithType, segment,
deletedSegmentsRetentionMs);
}
}
- protected void removeSegmentFromStore(String tableNameWithType, String
segmentId) {
+ protected void removeSegmentFromStore(String tableNameWithType, String
segmentId,
+ @Nullable Long deletedSegmentsRetentionMs) {
// Ignore HLC segments as they are not stored in Pinot FS
if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
return;
}
if (_dataDir != null) {
+ long retentionMs = deletedSegmentsRetentionMs == null
+ ? _defaultDeletedSegmentsRetentionMs : deletedSegmentsRetentionMs;
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName,
URIUtils.encode(segmentId));
PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme());
- if (_defaultDeletedSegmentsRetentionInDays <= 0) {
+ if (retentionMs <= 0) {
// delete the segment file instantly if retention is set to zero
try {
if (pinotFS.delete(fileToDeleteURI, true)) {
@@ -193,8 +226,9 @@ public class SegmentDeletionManager {
}
} else {
// move the segment file to deleted segments first and let retention
manager handler the deletion
- URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir,
DELETED_SEGMENTS, rawTableName,
- URIUtils.encode(segmentId));
+ String deletedFileName = deletedSegmentsRetentionMs == null ?
URIUtils.encode(segmentId)
+ : getDeletedSegmentFileName(URIUtils.encode(segmentId),
deletedSegmentsRetentionMs);
+ URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir,
DELETED_SEGMENTS, rawTableName, deletedFileName);
try {
if (pinotFS.exists(fileToDeleteURI)) {
// Overwrites the file if it already exists in the target
directory.
@@ -223,9 +257,8 @@ public class SegmentDeletionManager {
/**
* Removes aged deleted segments from the deleted directory
- * @param retentionInDays: retention for deleted segments in days
*/
- public void removeAgedDeletedSegments(int retentionInDays) {
+ public void removeAgedDeletedSegments() {
if (_dataDir != null) {
URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
@@ -254,8 +287,8 @@ public class SegmentDeletionManager {
int numFilesDeleted = 0;
for (String targetFile : targetFiles) {
URI targetURI = URIUtils.getUri(targetFile);
- Date dateToDelete =
DateTime.now().minusDays(retentionInDays).toDate();
- if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) {
+ long deletionTimeMs = getDeletionTimeMsFromFile(targetFile,
pinotFS.lastModified(targetURI));
+ if (System.currentTimeMillis() >= deletionTimeMs) {
if (!pinotFS.delete(targetURI, true)) {
LOGGER.warn("Cannot remove file {} from deleted directory.",
targetURI.toString());
} else {
@@ -278,4 +311,37 @@ public class SegmentDeletionManager {
LOGGER.info("dataDir is not configured, won't delete any expired
segments from deleted directory.");
}
}
+
+ private String getDeletedSegmentFileName(String fileName, long
deletedSegmentsRetentionMs) {
+ return fileName + RETENTION_UNTIL_SEPARATOR +
RETENTION_DATE_FORMAT.format(new Date(
+ System.currentTimeMillis() + deletedSegmentsRetentionMs));
+ }
+
+ private long getDeletionTimeMsFromFile(String targetFile, long
lastModifiedTime) {
+ String[] split = StringUtils.splitByWholeSeparator(targetFile,
RETENTION_UNTIL_SEPARATOR);
+ if (split.length == 2) {
+ try {
+ return RETENTION_DATE_FORMAT.parse(split[1]).getTime();
+ } catch (Exception e) {
+ LOGGER.warn("No retention suffix found for file: {}", targetFile);
+ }
+ }
+ LOGGER.info("Fallback to using default cluster retention config: {} ms",
_defaultDeletedSegmentsRetentionMs);
+ return lastModifiedTime + _defaultDeletedSegmentsRetentionMs;
+ }
+
+ @Nullable
+ private static Long getRetentionMsFromTableConfig(@Nullable TableConfig
tableConfig) {
+ if (tableConfig != null) {
+ SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
+ if
(!StringUtils.isEmpty(validationConfig.getDeletedSegmentsRetentionPeriod())) {
+ try {
+ return
TimeUtils.convertPeriodToMillis(validationConfig.getDeletedSegmentsRetentionPeriod());
+ } catch (Exception e) {
+ LOGGER.warn("Unable to parse deleted segment retention config for
table {}", tableConfig.getTableName(), e);
+ }
+ }
+ }
+ return null;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index d66d02d..bf24f78 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -64,17 +64,13 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
private static final Logger LOGGER =
LoggerFactory.getLogger(RetentionManager.class);
- private final int _deletedSegmentsRetentionInDays;
-
public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics) {
super("RetentionManager",
config.getRetentionControllerFrequencyInSeconds(),
config.getRetentionManagerInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
- _deletedSegmentsRetentionInDays =
config.getDeletedSegmentsRetentionInDays();
- LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {},
deletedSegmentsRetentionInDays: {}",
- getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
+ LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}",
getIntervalInSeconds());
}
@Override
@@ -97,8 +93,8 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
@Override
protected void postprocess() {
- LOGGER.info("Removing aged (more than {} days) deleted segments for all
tables", _deletedSegmentsRetentionInDays);
-
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+ LOGGER.info("Removing aged deleted segments for all tables");
+
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments();
}
private void manageRetentionForTable(TableConfig tableConfig) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index ef69d84..ce90263 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -49,7 +49,6 @@ import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
@@ -103,7 +102,7 @@ public class RetentionManagerTest {
SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
// Verify that the removeAgedDeletedSegments() method in deletion manager
is actually called.
- verify(deletionManager, times(1)).removeAgedDeletedSegments(anyInt());
+ verify(deletionManager, times(1)).removeAgedDeletedSegments();
// Verify that the deleteSegments method is actually called.
verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(),
anyList());
@@ -177,7 +176,7 @@ public class RetentionManagerTest {
throws Throwable {
return null;
}
- }).when(deletionManager).removeAgedDeletedSegments(anyInt());
+ }).when(deletionManager).removeAgedDeletedSegments();
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
// If and when PinotHelixResourceManager.deleteSegments() is invoked, make
sure that the segments deleted
@@ -229,7 +228,7 @@ public class RetentionManagerTest {
SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
// Verify that the removeAgedDeletedSegments() method in deletion manager
is actually called.
- verify(deletionManager, times(1)).removeAgedDeletedSegments(anyInt());
+ verify(deletionManager, times(1)).removeAgedDeletedSegments();
// Verify that the deleteSegments method is actually called.
verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(),
anyList());
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index 0984a14..8d56e7c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -20,23 +20,32 @@ package org.apache.pinot.controller.helix.core.util;
import com.google.common.io.Files;
import java.io.File;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
import org.joda.time.DateTime;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -51,8 +60,17 @@ import static org.mockito.Mockito.when;
public class SegmentDeletionManagerTest {
- private final static String TABLE_NAME = "table";
- private final static String CLUSTER_NAME = "mock";
+ private static final String TABLE_NAME = "table";
+ private static final String CLUSTER_NAME = "mock";
+ // these prefix must be the same as those in SegmentDeletionManager.
+ private static final String RETENTION_UNTIL_SEPARATOR =
"__RETENTION_UNTIL__";
+ private static final String RETENTION_DATE_FORMAT_STR = "yyyyMMddHHmm";
+ private static final SimpleDateFormat RETENTION_DATE_FORMAT;
+
+ static {
+ RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR);
+ RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
HelixAdmin makeHelixAdmin() {
HelixAdmin admin = mock(HelixAdmin.class);
@@ -208,10 +226,11 @@ public class SegmentDeletionManagerTest {
ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
File tempDir = Files.createTempDir();
tempDir.deleteOnExit();
- FakeDeletionManager deletionManager = new
FakeDeletionManager(tempDir.getAbsolutePath(), helixAdmin, propertyStore);
+ FakeDeletionManager deletionManager = new FakeDeletionManager(
+ tempDir.getAbsolutePath(), helixAdmin, propertyStore, 7);
// Test delete when deleted segments directory does not exists
- deletionManager.removeAgedDeletedSegments(1);
+ deletionManager.removeAgedDeletedSegments();
// Create deleted directory
String deletedDirectoryPath = tempDir + File.separator +
"Deleted_Segments";
@@ -219,29 +238,36 @@ public class SegmentDeletionManagerTest {
deletedDirectory.mkdir();
// Test delete when deleted segments directory is empty
- deletionManager.removeAgedDeletedSegments(1);
+ deletionManager.removeAgedDeletedSegments();
// Create dummy directories and files
File dummyDir1 = new File(deletedDirectoryPath + File.separator +
"dummy1");
dummyDir1.mkdir();
File dummyDir2 = new File(deletedDirectoryPath + File.separator +
"dummy2");
dummyDir2.mkdir();
+ File dummyDir3 = new File(deletedDirectoryPath + File.separator +
"dummy3");
+ dummyDir3.mkdir();
// Test delete when there is no files but some directories exist
- deletionManager.removeAgedDeletedSegments(1);
+ deletionManager.removeAgedDeletedSegments();
Assert.assertEquals(dummyDir1.exists(), false);
Assert.assertEquals(dummyDir2.exists(), false);
+ Assert.assertEquals(dummyDir3.exists(), false);
- // Create dummy directories and files
+ // Create dummy directories and files again
dummyDir1.mkdir();
dummyDir2.mkdir();
+ dummyDir3.mkdir();
// Create dummy files
for (int i = 0; i < 3; i++) {
- createTestFileWithAge(dummyDir1.getAbsolutePath() + File.separator +
"file" + i, i);
+ createTestFileWithAge(dummyDir1.getAbsolutePath() + File.separator +
genDeletedSegmentName("file" + i, i, 1), i);
}
for (int i = 2; i < 5; i++) {
- createTestFileWithAge(dummyDir2.getAbsolutePath() + File.separator +
"file" + i, i);
+ createTestFileWithAge(dummyDir2.getAbsolutePath() + File.separator +
genDeletedSegmentName("file" + i, i, 1), i);
+ }
+ for (int i = 6; i < 9; i++) {
+ createTestFileWithAge(dummyDir3.getAbsolutePath() + File.separator +
"file" + i, i);
}
// Sleep 1 second to ensure the clock moves.
@@ -250,18 +276,92 @@ public class SegmentDeletionManagerTest {
// Check that dummy directories and files are successfully created.
Assert.assertEquals(dummyDir1.list().length, 3);
Assert.assertEquals(dummyDir2.list().length, 3);
+ Assert.assertEquals(dummyDir3.list().length, 3);
- // Try to remove files with the retention of 3 days.
- deletionManager.removeAgedDeletedSegments(3);
- Assert.assertEquals(dummyDir1.list().length, 3);
- Assert.assertEquals(dummyDir2.list().length, 1);
+ // Try to remove files with the retention of 1 days.
+ deletionManager.removeAgedDeletedSegments();
- // Try to further remove files with the retention of 1 days.
- deletionManager.removeAgedDeletedSegments(1);
+ // Check that only 1 day retention file is remaining
Assert.assertEquals(dummyDir1.list().length, 1);
// Check that empty directory has successfully been removed.
Assert.assertEquals(dummyDir2.exists(), false);
+
+ // Check that deleted file without retention suffix is honoring
cluster-wide retention period of 7 days.
+ Assert.assertEquals(dummyDir3.list().length, 1);
+ }
+
+ @Test
+ public void testSegmentDeletionLogic()
+ throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY
+ ".class",
+ LocalPinotFS.class.getName());
+ PinotFSFactory.init(new PinotConfiguration(properties));
+
+ HelixAdmin helixAdmin = makeHelixAdmin();
+ ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
+ File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+ SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+ tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
+
+ // create table segment files.
+ Set<String> segments = new HashSet<>(segmentsThatShouldBeDeleted());
+ createTableAndSegmentFiles(tempDir, segmentsThatShouldBeDeleted());
+ final File tableDir = new File(tempDir.getAbsolutePath() + File.separator
+ TABLE_NAME);
+ final File deletedTableDir = new File(tempDir.getAbsolutePath() +
File.separator + "Deleted_Segments"
+ + File.separator + TABLE_NAME);
+
+ // delete the segments instantly.
+ SegmentsValidationAndRetentionConfig mockValidationConfig =
mock(SegmentsValidationAndRetentionConfig.class);
+
when(mockValidationConfig.getDeletedSegmentsRetentionPeriod()).thenReturn("0d");
+ TableConfig mockTableConfig = mock(TableConfig.class);
+
when(mockTableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+ deletionManager.deleteSegments(TABLE_NAME, segments, mockTableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ Assert.assertEquals(tableDir.listFiles().length, 0);
+ Assert.assertTrue(!deletedTableDir.exists() ||
deletedTableDir.listFiles().length == 0);
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }, 2000L, 10_000L, "Unable to verify table deletion with retention");
+
+ // create table segment files again to test default retention.
+ createTableAndSegmentFiles(tempDir, segmentsThatShouldBeDeleted());
+ // delete the segments with default retention
+ deletionManager.deleteSegments(TABLE_NAME, segments);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ Assert.assertEquals(tableDir.listFiles().length, 0);
+ Assert.assertEquals(deletedTableDir.listFiles().length,
segments.size());
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }, 2000L, 10_000L, "Unable to verify table deletion with retention");
+ }
+
+ public void createTableAndSegmentFiles(File tempDir, List<String> segmentIds)
+ throws Exception {
+ File tableDir = new File(tempDir.getAbsolutePath() + File.separator +
TABLE_NAME);
+ tableDir.mkdir();
+ for (String segmentId : segmentIds) {
+ createTestFileWithAge(tableDir.getAbsolutePath() + File.separator +
segmentId, 0);
+ }
+ }
+
+ public String genDeletedSegmentName(String fileName, int age, int
retentionInDays) {
+ // adding one more hours to the deletion time just to make sure the test
goes pass the retention period because
+ // we no longer keep second level info in the date format.
+ return StringUtils.join(fileName, RETENTION_UNTIL_SEPARATOR,
RETENTION_DATE_FORMAT.format(new Date(
+ DateTime.now().minusDays(age).getMillis()
+ + TimeUnit.DAYS.toMillis(retentionInDays)
+ - TimeUnit.HOURS.toMillis(1))));
}
public void createTestFileWithAge(String path, int age)
@@ -280,22 +380,24 @@ public class SegmentDeletionManagerTest {
super(null, helixAdmin, CLUSTER_NAME, propertyStore, 0);
}
- FakeDeletionManager(String localDiskDir, HelixAdmin helixAdmin,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
- super(localDiskDir, helixAdmin, CLUSTER_NAME, propertyStore, 0);
+ FakeDeletionManager(String localDiskDir, HelixAdmin helixAdmin,
ZkHelixPropertyStore<ZNRecord> propertyStore, int
+ deletedSegmentsRetentionInDays) {
+ super(localDiskDir, helixAdmin, CLUSTER_NAME, propertyStore,
deletedSegmentsRetentionInDays);
}
public void deleteSegmentsFromPropertyStoreAndLocal(String tableName,
Collection<String> segments) {
- super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, 0L);
+ super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, 0L,
0L);
}
@Override
- protected void removeSegmentFromStore(String tableName, String segmentId) {
+ protected void removeSegmentFromStore(String tableName, String segmentId,
+ @Nullable Long deletedSegmentsRetentionMs) {
_segmentsRemovedFromStore.add(segmentId);
}
@Override
- protected void deleteSegmentsWithDelay(final String tableName, final
Collection<String> segmentIds,
- final long deletionDelaySeconds) {
+ protected void deleteSegmentsWithDelay(String tableName,
Collection<String> segmentIds,
+ @Nullable Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
_segmentsToRetry.addAll(segmentIds);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index befc3e2..b57cef1 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -29,6 +29,7 @@ import org.apache.pinot.spi.utils.TimeUtils;
public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig {
private String _retentionTimeUnit;
private String _retentionTimeValue;
+ private String _deletedSegmentsRetentionPeriod;
@Deprecated
private String _segmentPushFrequency; // DO NOT REMOVE, this is used in
internal segment generation management
@Deprecated
@@ -102,6 +103,14 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
_retentionTimeValue = retentionTimeValue;
}
+ public String getDeletedSegmentsRetentionPeriod() {
+ return _deletedSegmentsRetentionPeriod;
+ }
+
+ public void setDeletedSegmentsRetentionPeriod(String
deletedSegmentsRetentionPeriod) {
+ _deletedSegmentsRetentionPeriod = deletedSegmentsRetentionPeriod;
+ }
+
/**
* @deprecated Use {@code segmentIngestionFrequency} from {@link
IngestionConfig#getBatchIngestionConfig()}
*/
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index ba8a07b..8871c6e 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -50,6 +50,7 @@ public class TableConfigBuilder {
private static final String DEFAULT_SEGMENT_PUSH_TYPE = "APPEND";
private static final String REFRESH_SEGMENT_PUSH_TYPE = "REFRESH";
private static final String DEFAULT_SEGMENT_ASSIGNMENT_STRATEGY =
"BalanceNumSegmentAssignmentStrategy";
+ private static final String DEFAULT_DELETED_SEGMENTS_RETENTION_PERIOD = "7d";
private static final String DEFAULT_NUM_REPLICAS = "1";
private static final String DEFAULT_LOAD_MODE = "HEAP";
private static final String MMAP_LOAD_MODE = "MMAP";
@@ -67,6 +68,7 @@ public class TableConfigBuilder {
private boolean _allowNullTimeValue;
private String _retentionTimeUnit;
private String _retentionTimeValue;
+ private String _deletedSegmentsRetentionPeriod =
DEFAULT_DELETED_SEGMENTS_RETENTION_PERIOD;
@Deprecated
private String _segmentPushFrequency;
@Deprecated
@@ -373,6 +375,7 @@ public class TableConfigBuilder {
validationConfig.setAllowNullTimeValue(_allowNullTimeValue);
validationConfig.setRetentionTimeUnit(_retentionTimeUnit);
validationConfig.setRetentionTimeValue(_retentionTimeValue);
+
validationConfig.setDeletedSegmentsRetentionPeriod(_deletedSegmentsRetentionPeriod);
validationConfig.setSegmentPushFrequency(_segmentPushFrequency);
validationConfig.setSegmentPushType(_segmentPushType);
validationConfig.setSegmentAssignmentStrategy(_segmentAssignmentStrategy);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]