This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 23197e26ac HDDS-11605. Directory deletion service should support
multiple threads (#7349)
23197e26ac is described below
commit 23197e26ac3fb4e2d3e6d66ddb96fe45714b165c
Author: Aryan Gupta <[email protected]>
AuthorDate: Mon Dec 9 17:42:48 2024 +0530
HDDS-11605. Directory deletion service should support multiple threads
(#7349)
---
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 5 +
.../ozone/TestDirectoryDeletingServiceWithFSO.java | 4 +-
.../hadoop/ozone/TestOzoneConfigurationFields.java | 1 +
...TestSnapshotDeletingServiceIntegrationTest.java | 8 +-
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 16 +-
.../om/service/AbstractKeyDeletingService.java | 10 +-
.../ozone/om/service/DirectoryDeletingService.java | 227 +++++++++++++--------
.../om/service/TestDirectoryDeletingService.java | 12 +-
8 files changed, 187 insertions(+), 96 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 2cf5e4ced3..880fe8614b 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -421,6 +421,11 @@ public final class OMConfigKeys {
// resulting 24MB
public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 6000;
+ public static final String OZONE_THREAD_NUMBER_DIR_DELETION =
+ "ozone.thread.number.dir.deletion";
+
+ public static final int OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT = 10;
+
public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK =
"ozone.snapshot.filtering.limit.per.task";
public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
index 8d161dedeb..78fb4c66fc 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
@@ -529,7 +529,7 @@ public class TestDirectoryDeletingServiceWithFSO {
when(ozoneManager.getOmSnapshotManager()).thenAnswer(i ->
omSnapshotManager);
DirectoryDeletingService service = Mockito.spy(new
DirectoryDeletingService(1000, TimeUnit.MILLISECONDS, 1000,
ozoneManager,
- cluster.getConf()));
+ cluster.getConf(), 1));
service.shutdown();
final int initialSnapshotCount =
(int)
cluster.getOzoneManager().getMetadataManager().countRowsInTable(snapshotInfoTable);
@@ -563,7 +563,7 @@ public class TestDirectoryDeletingServiceWithFSO {
}
return i.callRealMethod();
}).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(),
anyLong(),
- anyLong(), anyList(), anyList(), eq(null), anyLong(), anyInt(),
Mockito.any(), any());
+ anyLong(), anyList(), anyList(), eq(null), anyLong(), anyInt(),
Mockito.any(), any(), anyLong());
Mockito.doAnswer(i -> {
store.createSnapshot(testVolumeName, testBucketName, snap2);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 8a219514d3..1fbfc1f1f7 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -126,6 +126,7 @@ public class TestOzoneConfigurationFields extends
TestConfigurationFieldsBase {
OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY,
OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
+ OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION,
ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
ScmConfigKeys.OZONE_SCM_HA_PREFIX,
S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
index a9e5faa041..c3a58a1a21 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
@@ -480,22 +480,22 @@ public class TestSnapshotDeletingServiceIntegrationTest {
private DirectoryDeletingService
getMockedDirectoryDeletingService(AtomicBoolean dirDeletionWaitStarted,
AtomicBoolean dirDeletionStarted)
- throws InterruptedException, TimeoutException {
+ throws InterruptedException, TimeoutException, IOException {
OzoneManager ozoneManager = Mockito.spy(om);
om.getKeyManager().getDirDeletingService().shutdown();
GenericTestUtils.waitFor(() ->
om.getKeyManager().getDirDeletingService().getThreadCount() == 0, 1000,
100000);
DirectoryDeletingService directoryDeletingService = Mockito.spy(new
DirectoryDeletingService(10000,
- TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf()));
+ TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf(), 1));
directoryDeletingService.shutdown();
GenericTestUtils.waitFor(() -> directoryDeletingService.getThreadCount()
== 0, 1000,
100000);
- when(ozoneManager.getMetadataManager()).thenAnswer(i -> {
+ doAnswer(i -> {
// Wait for SDS to reach DDS wait block before processing any deleted
directories.
GenericTestUtils.waitFor(dirDeletionWaitStarted::get, 1000, 100000);
dirDeletionStarted.set(true);
return i.callRealMethod();
- });
+ }).when(directoryDeletingService).getPendingDeletedDirInfo();
return directoryDeletingService;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 7532cf8b32..ccda21efc9 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -153,6 +153,8 @@ import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_S
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
@@ -257,8 +259,16 @@ public class KeyManagerImpl implements KeyManager {
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
- dirDeletingService = new DirectoryDeletingService(dirDeleteInterval,
- TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration);
+ int dirDeletingServiceCorePoolSize =
+ configuration.getInt(OZONE_THREAD_NUMBER_DIR_DELETION,
+ OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT);
+ if (dirDeletingServiceCorePoolSize <= 0) {
+ dirDeletingServiceCorePoolSize = 1;
+ }
+ dirDeletingService =
+ new DirectoryDeletingService(dirDeleteInterval,
TimeUnit.MILLISECONDS,
+ serviceTimeout, ozoneManager, configuration,
+ dirDeletingServiceCorePoolSize);
dirDeletingService.start();
}
@@ -2052,7 +2062,7 @@ public class KeyManagerImpl implements KeyManager {
parentInfo.getObjectID(), "");
long countEntries = 0;
- Table dirTable = metadataManager.getDirectoryTable();
+ Table<String, OmDirectoryInfo> dirTable =
metadataManager.getDirectoryTable();
try (TableIterator<String,
? extends Table.KeyValue<String, OmDirectoryInfo>>
iterator = dirTable.iterator()) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 76c16232e3..0ac6c98660 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -280,7 +280,7 @@ public abstract class AbstractKeyDeletingService extends
BackgroundService
protected void submitPurgePaths(List<PurgePathRequest> requests,
String snapTableKey,
- UUID expectedPreviousSnapshotId) {
+ UUID expectedPreviousSnapshotId, long rnCnt)
{
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest
=
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
@@ -305,7 +305,7 @@ public abstract class AbstractKeyDeletingService extends
BackgroundService
// Submit Purge paths request to OM
try {
- OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId,
runCount.get());
+ OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId,
rnCnt);
} catch (ServiceException e) {
LOG.error("PurgePaths request failed. Will retry at next run.", e);
}
@@ -400,7 +400,7 @@ public abstract class AbstractKeyDeletingService extends
BackgroundService
List<PurgePathRequest> purgePathRequestList,
String snapTableKey, long startTime,
int remainingBufLimit, KeyManager keyManager,
- UUID expectedPreviousSnapshotId) {
+ UUID expectedPreviousSnapshotId, long rnCnt) {
// Optimization to handle delete sub-dir and keys to remove quickly
// This case will be useful to handle when depth of directory is high
@@ -442,7 +442,7 @@ public abstract class AbstractKeyDeletingService extends
BackgroundService
}
if (!purgePathRequestList.isEmpty()) {
- submitPurgePaths(purgePathRequestList, snapTableKey,
expectedPreviousSnapshotId);
+ submitPurgePaths(purgePathRequestList, snapTableKey,
expectedPreviousSnapshotId, rnCnt);
}
if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
@@ -455,7 +455,7 @@ public abstract class AbstractKeyDeletingService extends
BackgroundService
"DeletedDirectoryTable, iteration elapsed: {}ms," +
" totalRunCount: {}",
dirNum, subdirDelNum, subFileNum, (subDirNum - subdirDelNum),
- Time.monotonicNow() - startTime, getRunCount());
+ Time.monotonicNow() - startTime, rnCnt);
}
return remainNum;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 09f4a8f8a3..a8270f92f2 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -49,6 +50,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
@@ -74,10 +76,10 @@ public class DirectoryDeletingService extends
AbstractKeyDeletingService {
public static final Logger LOG =
LoggerFactory.getLogger(DirectoryDeletingService.class);
- // Use only a single thread for DirDeletion. Multiple threads would read
- // or write to same tables and can send deletion requests for same key
- // multiple times.
- private static final int DIR_DELETING_CORE_POOL_SIZE = 1;
+ // Using multi thread for DirDeletion. Multiple threads would read
+ // from parent directory info from deleted directory table concurrently
+ // and send deletion requests.
+ private final int dirDeletingCorePoolSize;
private static final int MIN_ERR_LIMIT_PER_TASK = 1000;
// Number of items(dirs/files) to be batched in an iteration.
@@ -86,11 +88,15 @@ public class DirectoryDeletingService extends
AbstractKeyDeletingService {
private final AtomicBoolean suspended;
private AtomicBoolean isRunningOnAOS;
+ private final DeletedDirSupplier deletedDirSupplier;
+
+ private AtomicInteger taskCount = new AtomicInteger(0);
+
public DirectoryDeletingService(long interval, TimeUnit unit,
long serviceTimeout, OzoneManager ozoneManager,
- OzoneConfiguration configuration) {
+ OzoneConfiguration configuration, int dirDeletingServiceCorePoolSize) {
super(DirectoryDeletingService.class.getSimpleName(), interval, unit,
- DIR_DELETING_CORE_POOL_SIZE, serviceTimeout, ozoneManager, null);
+ dirDeletingServiceCorePoolSize, serviceTimeout, ozoneManager, null);
this.pathLimitPerTask = configuration
.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
@@ -102,6 +108,9 @@ public class DirectoryDeletingService extends
AbstractKeyDeletingService {
this.ratisByteLimit = (int) (limit * 0.9);
this.suspended = new AtomicBoolean(false);
this.isRunningOnAOS = new AtomicBoolean(false);
+ this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize;
+ deletedDirSupplier = new DeletedDirSupplier();
+ taskCount.set(0);
}
private boolean shouldRun() {
@@ -116,6 +125,10 @@ public class DirectoryDeletingService extends
AbstractKeyDeletingService {
return isRunningOnAOS.get();
}
+ public AtomicInteger getTaskCount() {
+ return taskCount;
+ }
+
/**
* Suspend the service.
*/
@@ -135,10 +148,55 @@ public class DirectoryDeletingService extends
AbstractKeyDeletingService {
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
- queue.add(new DirectoryDeletingService.DirDeletingTask(this));
+ if (taskCount.get() > 0) {
+ LOG.info("{} Directory deleting task(s) already in progress.",
+ taskCount.get());
+ return queue;
+ }
+ try {
+ deletedDirSupplier.reInitItr();
+ } catch (IOException ex) {
+ LOG.error("Unable to get the iterator.", ex);
+ return queue;
+ }
+ taskCount.set(dirDeletingCorePoolSize);
+ for (int i = 0; i < dirDeletingCorePoolSize; i++) {
+ queue.add(new DirectoryDeletingService.DirDeletingTask(this));
+ }
return queue;
}
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ deletedDirSupplier.closeItr();
+ }
+
+ private final class DeletedDirSupplier {
+ private TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
+ deleteTableIterator;
+
+ private synchronized Table.KeyValue<String, OmKeyInfo> get()
+ throws IOException {
+ if (deleteTableIterator.hasNext()) {
+ return deleteTableIterator.next();
+ }
+ return null;
+ }
+
+ private synchronized void closeItr() {
+ IOUtils.closeQuietly(deleteTableIterator);
+ deleteTableIterator = null;
+ }
+
+ private synchronized void reInitItr() throws IOException {
+ closeItr();
+ deleteTableIterator =
+ getOzoneManager().getMetadataManager().getDeletedDirTable()
+ .iterator();
+ }
+ }
+
private final class DirDeletingTask implements BackgroundTask {
private final DirectoryDeletingService directoryDeletingService;
@@ -153,89 +211,93 @@ public class DirectoryDeletingService extends
AbstractKeyDeletingService {
@Override
public BackgroundTaskResult call() {
- if (shouldRun()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running DirectoryDeletingService");
- }
- isRunningOnAOS.set(true);
- getRunCount().incrementAndGet();
- long dirNum = 0L;
- long subDirNum = 0L;
- long subFileNum = 0L;
- long remainNum = pathLimitPerTask;
- int consumedSize = 0;
- List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
- List<Pair<String, OmKeyInfo>> allSubDirList
- = new ArrayList<>((int) remainNum);
-
- Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo;
-
- try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
- deleteTableIterator = getOzoneManager().getMetadataManager().
- getDeletedDirTable().iterator()) {
+ try {
+ if (shouldRun()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running DirectoryDeletingService");
+ }
+ isRunningOnAOS.set(true);
+ long rnCnt = getRunCount().incrementAndGet();
+ long dirNum = 0L;
+ long subDirNum = 0L;
+ long subFileNum = 0L;
+ long remainNum = pathLimitPerTask;
+ int consumedSize = 0;
+ List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
+ List<Pair<String, OmKeyInfo>> allSubDirList =
+ new ArrayList<>((int) remainNum);
+
+ Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo;
// This is to avoid race condition b/w purge request and snapshot
chain updation. For AOS taking the global
// snapshotId since AOS could process multiple buckets in one
iteration.
- UUID expectedPreviousSnapshotId =
-
((OmMetadataManagerImpl)getOzoneManager().getMetadataManager()).getSnapshotChainManager()
- .getLatestGlobalSnapshotId();
-
- long startTime = Time.monotonicNow();
- while (remainNum > 0 && deleteTableIterator.hasNext()) {
- pendingDeletedDirInfo = deleteTableIterator.next();
- // Do not reclaim if the directory is still being referenced by
- // the previous snapshot.
- if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
- continue;
- }
+ try {
+ UUID expectedPreviousSnapshotId =
+ ((OmMetadataManagerImpl)
getOzoneManager().getMetadataManager()).getSnapshotChainManager()
+ .getLatestGlobalSnapshotId();
- PurgePathRequest request = prepareDeleteDirRequest(
- remainNum, pendingDeletedDirInfo.getValue(),
- pendingDeletedDirInfo.getKey(), allSubDirList,
- getOzoneManager().getKeyManager());
- if (isBufferLimitCrossed(ratisByteLimit, consumedSize,
- request.getSerializedSize())) {
- if (purgePathRequestList.size() != 0) {
- // if message buffer reaches max limit, avoid sending further
- remainNum = 0;
+ long startTime = Time.monotonicNow();
+ while (remainNum > 0) {
+ pendingDeletedDirInfo = getPendingDeletedDirInfo();
+ if (pendingDeletedDirInfo == null) {
break;
}
- // if directory itself is having a lot of keys / files,
- // reduce capacity to minimum level
- remainNum = MIN_ERR_LIMIT_PER_TASK;
- request = prepareDeleteDirRequest(
- remainNum, pendingDeletedDirInfo.getValue(),
+ // Do not reclaim if the directory is still being referenced by
+ // the previous snapshot.
+ if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
+ continue;
+ }
+
+ PurgePathRequest request = prepareDeleteDirRequest(remainNum,
+ pendingDeletedDirInfo.getValue(),
pendingDeletedDirInfo.getKey(), allSubDirList,
getOzoneManager().getKeyManager());
+ if (isBufferLimitCrossed(ratisByteLimit, consumedSize,
+ request.getSerializedSize())) {
+ if (purgePathRequestList.size() != 0) {
+ // if message buffer reaches max limit, avoid sending further
+ remainNum = 0;
+ break;
+ }
+ // if directory itself is having a lot of keys / files,
+ // reduce capacity to minimum level
+ remainNum = MIN_ERR_LIMIT_PER_TASK;
+ request = prepareDeleteDirRequest(remainNum,
+ pendingDeletedDirInfo.getValue(),
+ pendingDeletedDirInfo.getKey(), allSubDirList,
+ getOzoneManager().getKeyManager());
+ }
+ consumedSize += request.getSerializedSize();
+ purgePathRequestList.add(request);
+ // reduce remain count for self, sub-files, and sub-directories
+ remainNum = remainNum - 1;
+ remainNum = remainNum - request.getDeletedSubFilesCount();
+ remainNum = remainNum - request.getMarkDeletedSubDirsCount();
+ // Count up the purgeDeletedDir, subDirs and subFiles
+ if (request.getDeletedDir() != null && !request.getDeletedDir()
+ .isEmpty()) {
+ dirNum++;
+ }
+ subDirNum += request.getMarkDeletedSubDirsCount();
+ subFileNum += request.getDeletedSubFilesCount();
}
- consumedSize += request.getSerializedSize();
- purgePathRequestList.add(request);
- // reduce remain count for self, sub-files, and sub-directories
- remainNum = remainNum - 1;
- remainNum = remainNum - request.getDeletedSubFilesCount();
- remainNum = remainNum - request.getMarkDeletedSubDirsCount();
- // Count up the purgeDeletedDir, subDirs and subFiles
- if (request.getDeletedDir() != null
- && !request.getDeletedDir().isEmpty()) {
- dirNum++;
- }
- subDirNum += request.getMarkDeletedSubDirsCount();
- subFileNum += request.getDeletedSubFilesCount();
- }
+ optimizeDirDeletesAndSubmitRequest(remainNum, dirNum, subDirNum,
+ subFileNum, allSubDirList, purgePathRequestList, null,
+ startTime, ratisByteLimit - consumedSize,
+ getOzoneManager().getKeyManager(), expectedPreviousSnapshotId,
+ rnCnt);
- optimizeDirDeletesAndSubmitRequest(
- remainNum, dirNum, subDirNum, subFileNum,
- allSubDirList, purgePathRequestList, null, startTime,
- ratisByteLimit - consumedSize,
- getOzoneManager().getKeyManager(), expectedPreviousSnapshotId);
-
- } catch (IOException e) {
- LOG.error("Error while running delete directories and files " +
- "background task. Will retry at next run.", e);
- }
- isRunningOnAOS.set(false);
- synchronized (directoryDeletingService) {
- this.directoryDeletingService.notify();
+ } catch (IOException e) {
+ LOG.error(
+ "Error while running delete directories and files " +
"background task. Will retry at next run.",
+ e);
+ }
+ isRunningOnAOS.set(false);
+ synchronized (directoryDeletingService) {
+ this.directoryDeletingService.notify();
+ }
}
+ } finally {
+ taskCount.getAndDecrement();
}
// place holder by returning empty results of this call back.
return BackgroundTaskResult.EmptyTaskResult.newResult();
@@ -301,4 +363,9 @@ public class DirectoryDeletingService extends
AbstractKeyDeletingService {
}
}
+ public KeyValue<String, OmKeyInfo> getPendingDeletedDirInfo()
+ throws IOException {
+ return deletedDirSupplier.get();
+ }
+
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index 04e8efa7b7..681b24b8e4 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -51,6 +51,9 @@ import org.junit.jupiter.api.io.TempDir;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -202,14 +205,19 @@ public class TestDirectoryDeletingService {
.setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
.setDataSize(0).setRecursive(true).build();
writeClient.deleteKey(delArgs);
+ int pathDelLimit = conf.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
+ OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
+ int numThread = conf.getInt(OZONE_THREAD_NUMBER_DIR_DELETION,
+ OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT);
// check if difference between each run should not cross the directory
deletion limit
// and wait till all dir is removed
GenericTestUtils.waitFor(() -> {
delDirCnt[1] = dirDeletingService.getDeletedDirsCount();
- assertTrue(delDirCnt[1] - delDirCnt[0] <=
OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT,
+ assertTrue(
+ delDirCnt[1] - delDirCnt[0] <= ((long) pathDelLimit * numThread),
"base: " + delDirCnt[0] + ", new: " + delDirCnt[1]);
- delDirCnt[0] = delDirCnt[1];
+ delDirCnt[0] = delDirCnt[1];
return dirDeletingService.getDeletedDirsCount() >= dirCreatesCount;
}, 500, 300000);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]