This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 5a8cc7b5a9 HDDS-8389. [Snapshot] Added integration test for SnapDiff
when OM leader failover happens (#4657)
5a8cc7b5a9 is described below
commit 5a8cc7b5a9c6a830b13f47ff427346dc8ce34945
Author: Hemant Kumar <[email protected]>
AuthorDate: Tue May 23 16:54:18 2023 -0700
HDDS-8389. [Snapshot] Added integration test for SnapDiff when OM leader
failover happens (#4657)
---
.../org/apache/hadoop/ozone/om/TestOmSnapshot.java | 197 ++++++++++++++-------
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 2 +
.../ozone/om/TestOzoneManagerHASnapshot.java | 100 ++++++++++-
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 6 +
.../ozone/om/snapshot/SnapshotDiffManager.java | 36 +++-
5 files changed, 264 insertions(+), 77 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
index 720407b854..a61b00be51 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
@@ -16,8 +16,10 @@
*/
package org.apache.hadoop.ozone.om;
+import java.time.Duration;
import java.util.List;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
@@ -27,8 +29,8 @@ import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
import org.apache.hadoop.hdds.utils.db.DBProfile;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
@@ -76,11 +78,16 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.CONTAINS_SNAPSHOT;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -108,11 +115,8 @@ public class TestOmSnapshot {
private static boolean enabledFileSystemPaths;
private static boolean forceFullSnapshotDiff;
private static ObjectStore store;
- private static OzoneConfiguration leaderConfig;
- private static OzoneManager leaderOzoneManager;
-
+ private static OzoneManager ozoneManager;
private static RDBStore rdbStore;
-
private static OzoneBucket ozoneBucket;
@Rule
@@ -121,9 +125,9 @@ public class TestOmSnapshot {
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
- new Object[]{OBJECT_STORE, false, false},
- new Object[]{FILE_SYSTEM_OPTIMIZED, false, false},
- new Object[]{BucketLayout.LEGACY, true, true});
+ new Object[]{OBJECT_STORE, false, false},
+ new Object[]{FILE_SYSTEM_OPTIMIZED, false, false},
+ new Object[]{BucketLayout.LEGACY, true, true});
}
public TestOmSnapshot(BucketLayout newBucketLayout,
@@ -158,22 +162,20 @@ public class TestOmSnapshot {
OzoneConfiguration conf = new OzoneConfiguration();
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
- conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
- enabledFileSystemPaths);
- conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
- bucketLayout.name());
- conf.setBoolean(OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF,
- forceFullSnapshotDiff);
+ String omId = UUID.randomUUID().toString();
+ conf.setBoolean(OZONE_OM_ENABLE_FILESYSTEM_PATHS, enabledFileSystemPaths);
+ conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, bucketLayout.name());
+ conf.setBoolean(OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF, forceFullSnapshotDiff);
conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
// Enable filesystem snapshot feature for the test regardless of the
default
conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
- cluster = MiniOzoneCluster.newOMHABuilder(conf)
+ cluster = MiniOzoneCluster.newBuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
- .setOMServiceId("om-service-test1")
- .setNumOfOzoneManagers(3)
+ .setOmId(omId)
.build();
+
cluster.waitForClusterToBeReady();
client = cluster.newClient();
// create a volume and a bucket to be used by OzoneFileSystem
@@ -181,18 +183,14 @@ public class TestOmSnapshot {
.createVolumeAndBucket(client, bucketLayout);
volumeName = ozoneBucket.getVolumeName();
bucketName = ozoneBucket.getName();
-
- leaderOzoneManager = ((MiniOzoneHAClusterImpl) cluster).getOMLeader();
- leaderConfig = leaderOzoneManager.getConfiguration();
- rdbStore =
- (RDBStore) leaderOzoneManager.getMetadataManager().getStore();
- cluster.setConf(leaderConfig);
+ ozoneManager = cluster.getOzoneManager();
+ rdbStore = (RDBStore) ozoneManager.getMetadataManager().getStore();
store = client.getObjectStore();
writeClient = store.getClientProxy().getOzoneManagerClient();
KeyManagerImpl keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils
- .getInternalState(leaderOzoneManager, "keyManager");
+ .getInternalState(ozoneManager, "keyManager");
// stop the deletion services so that keys can still be read
keyManager.stop();
@@ -846,12 +844,12 @@ public class TestOmSnapshot {
store.createSnapshot(volName, buckName, snapshotName);
String snapshotKeyPrefix =
OmSnapshotManager.getSnapshotPrefix(snapshotName);
- SnapshotInfo snapshotInfo =
- leaderOzoneManager.getMetadataManager().getSnapshotInfoTable()
- .get(SnapshotInfo.getTableKey(volName, buckName, snapshotName));
+ SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager()
+ .getSnapshotInfoTable()
+ .get(SnapshotInfo.getTableKey(volName, buckName, snapshotName));
String snapshotDirName =
- OmSnapshotManager.getSnapshotPath(leaderConfig, snapshotInfo) +
- OM_KEY_PREFIX + "CURRENT";
+ OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(),
+ snapshotInfo) + OM_KEY_PREFIX + "CURRENT";
GenericTestUtils
.waitFor(() -> new File(snapshotDirName).exists(), 1000, 120000);
return snapshotKeyPrefix;
@@ -875,45 +873,6 @@ public class TestOmSnapshot {
return key;
}
- @Test
- public void testUniqueSnapshotId()
- throws IOException, InterruptedException, TimeoutException {
- createFileKey(ozoneBucket, "key");
-
- String snapshotName = UUID.randomUUID().toString();
- store.createSnapshot(volumeName, bucketName, snapshotName);
- List<OzoneManager> ozoneManagers = ((MiniOzoneHAClusterImpl) cluster)
- .getOzoneManagersList();
- List<String> snapshotIds = new ArrayList<>();
-
- for (OzoneManager ozoneManager : ozoneManagers) {
- GenericTestUtils.waitFor(
- () -> {
- SnapshotInfo snapshotInfo;
- try {
- snapshotInfo = ozoneManager.getMetadataManager()
- .getSnapshotInfoTable()
- .get(
- SnapshotInfo.getTableKey(volumeName,
- bucketName,
- snapshotName)
- );
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- if (snapshotInfo != null) {
- snapshotIds.add(snapshotInfo.getSnapshotID());
- }
- return snapshotInfo != null;
- },
- 1000,
- 120000);
- }
-
- assertEquals(1, snapshotIds.stream().distinct().count());
- }
-
@Test
public void testSnapshotOpensWithDisabledAutoCompaction() throws Exception {
String snapPrefix = createSnapshot(volumeName, bucketName);
@@ -929,6 +888,108 @@ public class TestOmSnapshot {
}
}
+ // Test snapshot diff when OM restarts in non-HA OM env and diff job is
+ // in_progress when it restarts.
+ @Test
+ public void testSnapshotDiffWhenOmRestart()
+ throws IOException, InterruptedException {
+ String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(5);
+ String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(5);
+ createSnapshots(snapshot1, snapshot2);
+
+ SnapshotDiffResponse response = store.snapshotDiff(volumeName, bucketName,
+ snapshot1, snapshot2, null, 0, false);
+
+ assertEquals(IN_PROGRESS, response.getJobStatus());
+
+ // Restart the OM and wait for sometime to make sure that previous snapDiff
+ // job finishes.
+ cluster.restartOzoneManager();
+ await().atMost(Duration.ofSeconds(120)).
+ until(() -> cluster.getOzoneManager().isRunning());
+
+ response = store.snapshotDiff(volumeName, bucketName,
+ snapshot1, snapshot2, null, 0, false);
+
+ // If job was IN_PROGRESS or DONE state when OM restarted, it should be
+ // DONE by this time.
+ // If job FAILED during crash (which mostly happens in the test because
+ // of active snapshot checks), it would be removed by clean up service on
+ // startup, and request after clean up will be considered a new request
+ // and would return IN_PROGRESS. No other state is expected other than
+ // IN_PROGRESS and DONE.
+ if (response.getJobStatus() == DONE) {
+ assertEquals(100, response.getSnapshotDiffReport().getDiffList().size());
+ } else if (response.getJobStatus() == IN_PROGRESS) {
+ SnapshotDiffReportOzone diffReport =
+ fetchReportPage(snapshot1, snapshot2, null, 0);
+ assertEquals(100, diffReport.getDiffList().size());
+ } else {
+ fail("Unexpected job status for the test.");
+ }
+ }
+
+ // Test snapshot diff when OM restarts in non-HA OM env and report is
+ // partially received.
+ @Test
+ public void testSnapshotDiffWhenOmRestartAndReportIsPartiallyFetched()
+ throws IOException, InterruptedException {
+ int pageSize = 10;
+ String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(5);
+ String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(5);
+ createSnapshots(snapshot1, snapshot2);
+
+ SnapshotDiffReportOzone diffReport = fetchReportPage(snapshot1, snapshot2,
+ null, pageSize);
+
+ List<DiffReportEntry> diffReportEntries = diffReport.getDiffList();
+ String nextToken = diffReport.getToken();
+
+ // Restart the OM and no need to wait because snapDiff job finished before
+ // the restart.
+ cluster.restartOzoneManager();
+ await().atMost(Duration.ofSeconds(120)).
+ until(() -> cluster.getOzoneManager().isRunning());
+
+ while (nextToken == null || StringUtils.isNotEmpty(nextToken)) {
+ diffReport = fetchReportPage(snapshot1, snapshot2, nextToken, pageSize);
+ diffReportEntries.addAll(diffReport.getDiffList());
+ nextToken = diffReport.getToken();
+ }
+ assertEquals(100, diffReportEntries.size());
+ }
+
+ private SnapshotDiffReportOzone fetchReportPage(String fromSnapshot,
+ String toSnapshot,
+ String token,
+ int pageSize)
+ throws IOException, InterruptedException {
+
+ while (true) {
+ SnapshotDiffResponse response = store.snapshotDiff(volumeName,
bucketName,
+ fromSnapshot, toSnapshot, token, pageSize, false);
+ if (response.getJobStatus() == IN_PROGRESS) {
+ Thread.sleep(response.getWaitTimeInMs());
+ } else if (response.getJobStatus() == DONE) {
+ return response.getSnapshotDiffReport();
+ } else {
+ fail("Unexpected job status for the test.");
+ }
+ }
+ }
+
+ private void createSnapshots(String snapshot1,
+ String snapshot2) throws IOException {
+ createFileKey(ozoneBucket, "key");
+ store.createSnapshot(volumeName, bucketName, snapshot1);
+
+ for (int i = 0; i < 100; i++) {
+ createFileKey(ozoneBucket, "key-" + i);
+ }
+
+ store.createSnapshot(volumeName, bucketName, snapshot2);
+ }
+
@Test
public void testCompactionDagDisableForSnapshotMetadata() throws Exception {
String snapshotName = createSnapshot(volumeName, bucketName);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 6b2c21d7d3..6c3e085866 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -158,6 +158,8 @@ public abstract class TestOzoneManagerHA {
conf.setLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
SNAPSHOT_THRESHOLD);
+ // Enable filesystem snapshot feature for the test regardless of the
default
+ conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
// Some subclasses check RocksDB directly as part of their tests. These
// depend on OBS layout.
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
index 19dfe91ea9..960633c32a 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -40,10 +41,13 @@ import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -91,6 +95,98 @@ public class TestOzoneManagerHASnapshot {
}
}
+ // Test snapshot diff when OM restarts in HA OM env.
+ @Test
+ public void testSnapshotDiffWhenOmLeaderRestart()
+ throws Exception {
+ String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(10);
+ String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(10);
+
+ createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10));
+ store.createSnapshot(volumeName, bucketName, snapshot1);
+
+ for (int i = 0; i < 100; i++) {
+ createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10));
+ }
+
+ store.createSnapshot(volumeName, bucketName, snapshot2);
+
+ SnapshotDiffResponse response =
+ store.snapshotDiff(volumeName, bucketName,
+ snapshot1, snapshot2, null, 0, false);
+
+ assertEquals(IN_PROGRESS, response.getJobStatus());
+
+ String oldLeader = cluster.getOMLeader().getOMNodeId();
+
+ OzoneManager omLeader = cluster.getOMLeader();
+ cluster.shutdownOzoneManager(omLeader);
+ cluster.restartOzoneManager(omLeader, true);
+
+ await().atMost(Duration.ofSeconds(120))
+ .until(() -> cluster.getOMLeader() != null);
+
+ String newLeader = cluster.getOMLeader().getOMNodeId();
+
+ if (Objects.equals(oldLeader, newLeader)) {
+ // If old leader becomes leader again. Job should be done by this time.
+ response = store.snapshotDiff(volumeName, bucketName,
+ snapshot1, snapshot2, null, 0, false);
+ assertEquals(DONE, response.getJobStatus());
+ assertEquals(100, response.getSnapshotDiffReport().getDiffList().size());
+ } else {
+ // If new leader is different from old leader. SnapDiff request will be
+ // new to OM, and job status should be IN_PROGRESS.
+ response = store.snapshotDiff(volumeName, bucketName, snapshot1,
+ snapshot2, null, 0, false);
+ assertEquals(IN_PROGRESS, response.getJobStatus());
+ while (true) {
+ response = store.snapshotDiff(volumeName, bucketName, snapshot1,
+ snapshot2, null, 0, false);
+ if (DONE == response.getJobStatus()) {
+ assertEquals(100,
+ response.getSnapshotDiffReport().getDiffList().size());
+ break;
+ }
+ Thread.sleep(response.getWaitTimeInMs());
+ }
+ }
+ }
+
+ @Test
+ public void testSnapshotIdConsistency() throws Exception {
+ createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10));
+
+ String snapshotName = "snap-" + RandomStringUtils.randomNumeric(10);
+
+ store.createSnapshot(volumeName, bucketName, snapshotName);
+ List<OzoneManager> ozoneManagers = cluster.getOzoneManagersList();
+ List<String> snapshotIds = new ArrayList<>();
+
+ for (OzoneManager ozoneManager : ozoneManagers) {
+ await().atMost(Duration.ofSeconds(120))
+ .until(() -> {
+ SnapshotInfo snapshotInfo;
+ try {
+ snapshotInfo = ozoneManager.getMetadataManager()
+ .getSnapshotInfoTable()
+ .get(SnapshotInfo.getTableKey(volumeName,
+ bucketName,
+ snapshotName));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (snapshotInfo != null) {
+ snapshotIds.add(snapshotInfo.getSnapshotID());
+ }
+ return snapshotInfo != null;
+ });
+ }
+
+ assertEquals(1, snapshotIds.stream().distinct().count());
+ }
+
/**
* Test snapshotNames are unique among OM nodes when snapshotName is not
* passed or empty.
@@ -146,8 +242,8 @@ public class TestOzoneManagerHASnapshot {
for (int i = 0; i < 100; i++) {
int index = i % 10;
createFileKey(ozoneBuckets.get(index),
- "key-" + RandomStringUtils.randomNumeric(3));
- String snapshot1 = "snapshot-" + RandomStringUtils.randomNumeric(5);
+ "key-" + RandomStringUtils.randomNumeric(10));
+ String snapshot1 = "snapshot-" + RandomStringUtils.randomNumeric(10);
store.createSnapshot(volumeNames.get(index),
bucketNames.get(index), snapshot1);
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 92ee5e5c63..eb5ea0f299 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -830,6 +830,12 @@ public final class OmSnapshotManager implements
AutoCloseable {
@Override
public void close() {
+ if (snapshotDiffManager != null) {
+ snapshotDiffManager.close();
+ }
+ if (snapshotCache != null) {
+ snapshotCache.invalidateAll();
+ }
if (snapshotDiffCleanupService != null) {
snapshotDiffCleanupService.shutdown();
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 4719d06967..5e38ad33a2 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -152,7 +152,8 @@ public class SnapshotDiffManager implements AutoCloseable {
* similar type of request at any point of time.
*/
private final PersistentMap<String, SnapshotDiffJob> snapDiffJobTable;
- private final ExecutorService executorService;
+ private final ExecutorService snapDiffExecutor;
+ private ExecutorService sstDumpToolExecutor;
/**
* Directory to keep hardlinks of SST files for a snapDiff job temporarily.
@@ -213,7 +214,7 @@ public class SnapshotDiffManager implements AutoCloseable {
byte[].class,
byte[].class);
- this.executorService = new ThreadPoolExecutor(threadPoolSize,
+ this.snapDiffExecutor = new ThreadPoolExecutor(threadPoolSize,
threadPoolSize,
0,
TimeUnit.MILLISECONDS,
@@ -264,13 +265,14 @@ public class SnapshotDiffManager implements AutoCloseable
{
OMConfigKeys
.OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT,
StorageUnit.BYTES);
- ExecutorService execService = new ThreadPoolExecutor(0,
+ sstDumpToolExecutor = new ThreadPoolExecutor(0,
threadPoolSize, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadFactoryBuilder()
.setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d")
.build(),
new ThreadPoolExecutor.DiscardPolicy());
- return Optional.of(new ManagedSSTDumpTool(execService, bufferSize));
+ return Optional.of(new ManagedSSTDumpTool(sstDumpToolExecutor,
+ bufferSize));
} catch (NativeLibraryNotLoadedException e) {
return Optional.empty();
}
@@ -560,7 +562,7 @@ public class SnapshotDiffManager implements AutoCloseable {
// If executor cannot take any more job, remove the job form DB and return
// the Rejected Job status with wait time.
try {
- executorService.execute(() -> generateSnapshotDiffReport(jobKey, jobId,
+ snapDiffExecutor.execute(() -> generateSnapshotDiffReport(jobKey, jobId,
volumeName, bucketName, fromSnapshotName, toSnapshotName,
forceFullDiff));
updateJobStatus(jobKey, QUEUED, IN_PROGRESS);
@@ -1261,9 +1263,29 @@ public class SnapshotDiffManager implements
AutoCloseable {
}
@Override
- public void close() throws Exception {
+ public void close() {
+ if (snapDiffExecutor != null) {
+ closeExecutorService(snapDiffExecutor, "SnapDiffExecutor");
+ }
+ if (sstDumpToolExecutor != null) {
+ closeExecutorService(sstDumpToolExecutor, "SstDumpToolExecutor");
+ }
+ }
+
+ private void closeExecutorService(ExecutorService executorService,
+ String serviceName) {
if (executorService != null) {
- executorService.shutdown();
+ LOG.info("Shutting down executorService: '{}'", serviceName);
+ executorService.shutdownNow();
+ try {
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
+ executorService.shutdownNow();
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]