This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a8cc7b5a9 HDDS-8389. [Snapshot] Added integration test for SnapDiff 
when OM leader failover happens (#4657)
5a8cc7b5a9 is described below

commit 5a8cc7b5a9c6a830b13f47ff427346dc8ce34945
Author: Hemant Kumar <[email protected]>
AuthorDate: Tue May 23 16:54:18 2023 -0700

    HDDS-8389. [Snapshot] Added integration test for SnapDiff when OM leader 
failover happens (#4657)
---
 .../org/apache/hadoop/ozone/om/TestOmSnapshot.java | 197 ++++++++++++++-------
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |   2 +
 .../ozone/om/TestOzoneManagerHASnapshot.java       | 100 ++++++++++-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |   6 +
 .../ozone/om/snapshot/SnapshotDiffManager.java     |  36 +++-
 5 files changed, 264 insertions(+), 77 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
index 720407b854..a61b00be51 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
@@ -16,8 +16,10 @@
  */
 
 package org.apache.hadoop.ozone.om;
+import java.time.Duration;
 import java.util.List;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
@@ -27,8 +29,8 @@ import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
 import org.apache.hadoop.hdds.utils.db.DBProfile;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
@@ -76,11 +78,16 @@ import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.CONTAINS_SNAPSHOT;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
 import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -108,11 +115,8 @@ public class TestOmSnapshot {
   private static boolean enabledFileSystemPaths;
   private static boolean forceFullSnapshotDiff;
   private static ObjectStore store;
-  private static OzoneConfiguration leaderConfig;
-  private static OzoneManager leaderOzoneManager;
-
+  private static OzoneManager ozoneManager;
   private static RDBStore rdbStore;
-
   private static OzoneBucket ozoneBucket;
 
   @Rule
@@ -121,9 +125,9 @@ public class TestOmSnapshot {
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
     return Arrays.asList(
-                         new Object[]{OBJECT_STORE, false, false},
-                         new Object[]{FILE_SYSTEM_OPTIMIZED, false, false},
-                         new Object[]{BucketLayout.LEGACY, true, true});
+        new Object[]{OBJECT_STORE, false, false},
+        new Object[]{FILE_SYSTEM_OPTIMIZED, false, false},
+        new Object[]{BucketLayout.LEGACY, true, true});
   }
 
   public TestOmSnapshot(BucketLayout newBucketLayout,
@@ -158,22 +162,20 @@ public class TestOmSnapshot {
     OzoneConfiguration conf = new OzoneConfiguration();
     String clusterId = UUID.randomUUID().toString();
     String scmId = UUID.randomUUID().toString();
-    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
-        enabledFileSystemPaths);
-    conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
-        bucketLayout.name());
-    conf.setBoolean(OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF,
-        forceFullSnapshotDiff);
+    String omId = UUID.randomUUID().toString();
+    conf.setBoolean(OZONE_OM_ENABLE_FILESYSTEM_PATHS, enabledFileSystemPaths);
+    conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, bucketLayout.name());
+    conf.setBoolean(OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF, forceFullSnapshotDiff);
     conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
     // Enable filesystem snapshot feature for the test regardless of the 
default
     conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
 
-    cluster = MiniOzoneCluster.newOMHABuilder(conf)
+    cluster = MiniOzoneCluster.newBuilder(conf)
         .setClusterId(clusterId)
         .setScmId(scmId)
-        .setOMServiceId("om-service-test1")
-        .setNumOfOzoneManagers(3)
+        .setOmId(omId)
         .build();
+
     cluster.waitForClusterToBeReady();
     client = cluster.newClient();
     // create a volume and a bucket to be used by OzoneFileSystem
@@ -181,18 +183,14 @@ public class TestOmSnapshot {
         .createVolumeAndBucket(client, bucketLayout);
     volumeName = ozoneBucket.getVolumeName();
     bucketName = ozoneBucket.getName();
-
-    leaderOzoneManager = ((MiniOzoneHAClusterImpl) cluster).getOMLeader();
-    leaderConfig = leaderOzoneManager.getConfiguration();
-    rdbStore =
-        (RDBStore) leaderOzoneManager.getMetadataManager().getStore();
-    cluster.setConf(leaderConfig);
+    ozoneManager = cluster.getOzoneManager();
+    rdbStore = (RDBStore) ozoneManager.getMetadataManager().getStore();
 
     store = client.getObjectStore();
     writeClient = store.getClientProxy().getOzoneManagerClient();
 
     KeyManagerImpl keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils
-        .getInternalState(leaderOzoneManager, "keyManager");
+        .getInternalState(ozoneManager, "keyManager");
 
     // stop the deletion services so that keys can still be read
     keyManager.stop();
@@ -846,12 +844,12 @@ public class TestOmSnapshot {
     store.createSnapshot(volName, buckName, snapshotName);
     String snapshotKeyPrefix =
         OmSnapshotManager.getSnapshotPrefix(snapshotName);
-    SnapshotInfo snapshotInfo =
-        leaderOzoneManager.getMetadataManager().getSnapshotInfoTable()
-            .get(SnapshotInfo.getTableKey(volName, buckName, snapshotName));
+    SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager()
+        .getSnapshotInfoTable()
+        .get(SnapshotInfo.getTableKey(volName, buckName, snapshotName));
     String snapshotDirName =
-        OmSnapshotManager.getSnapshotPath(leaderConfig, snapshotInfo) +
-        OM_KEY_PREFIX + "CURRENT";
+        OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(),
+            snapshotInfo) + OM_KEY_PREFIX + "CURRENT";
     GenericTestUtils
         .waitFor(() -> new File(snapshotDirName).exists(), 1000, 120000);
     return snapshotKeyPrefix;
@@ -875,45 +873,6 @@ public class TestOmSnapshot {
     return key;
   }
 
-  @Test
-  public void testUniqueSnapshotId()
-      throws IOException, InterruptedException, TimeoutException {
-    createFileKey(ozoneBucket, "key");
-
-    String snapshotName = UUID.randomUUID().toString();
-    store.createSnapshot(volumeName, bucketName, snapshotName);
-    List<OzoneManager> ozoneManagers = ((MiniOzoneHAClusterImpl) cluster)
-        .getOzoneManagersList();
-    List<String> snapshotIds = new ArrayList<>();
-
-    for (OzoneManager ozoneManager : ozoneManagers) {
-      GenericTestUtils.waitFor(
-          () -> {
-            SnapshotInfo snapshotInfo;
-            try {
-              snapshotInfo = ozoneManager.getMetadataManager()
-                  .getSnapshotInfoTable()
-                  .get(
-                      SnapshotInfo.getTableKey(volumeName,
-                          bucketName,
-                          snapshotName)
-                  );
-            } catch (IOException e) {
-              throw new RuntimeException(e);
-            }
-
-            if (snapshotInfo != null) {
-              snapshotIds.add(snapshotInfo.getSnapshotID());
-            }
-            return snapshotInfo != null;
-          },
-          1000,
-          120000);
-    }
-
-    assertEquals(1, snapshotIds.stream().distinct().count());
-  }
-
   @Test
   public void testSnapshotOpensWithDisabledAutoCompaction() throws Exception {
     String snapPrefix = createSnapshot(volumeName, bucketName);
@@ -929,6 +888,108 @@ public class TestOmSnapshot {
     }
   }
 
+  // Test snapshot diff when OM restarts in non-HA OM env and diff job is
+  // in_progress when it restarts.
+  @Test
+  public void testSnapshotDiffWhenOmRestart()
+      throws IOException, InterruptedException {
+    String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(5);
+    String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(5);
+    createSnapshots(snapshot1, snapshot2);
+
+    SnapshotDiffResponse response = store.snapshotDiff(volumeName, bucketName,
+        snapshot1, snapshot2, null, 0, false);
+
+    assertEquals(IN_PROGRESS, response.getJobStatus());
+
+    // Restart the OM and wait for sometime to make sure that previous snapDiff
+    // job finishes.
+    cluster.restartOzoneManager();
+    await().atMost(Duration.ofSeconds(120)).
+        until(() -> cluster.getOzoneManager().isRunning());
+
+    response = store.snapshotDiff(volumeName, bucketName,
+        snapshot1, snapshot2, null, 0, false);
+
+    // If job was IN_PROGRESS or DONE state when OM restarted, it should be
+    // DONE by this time.
+    // If job FAILED during crash (which mostly happens in the test because
+    // of active snapshot checks), it would be removed by clean up service on
+    // startup, and request after clean up will be considered a new request
+    // and would return IN_PROGRESS. No other state is expected other than
+    // IN_PROGRESS and DONE.
+    if (response.getJobStatus() == DONE) {
+      assertEquals(100, response.getSnapshotDiffReport().getDiffList().size());
+    } else if (response.getJobStatus() == IN_PROGRESS) {
+      SnapshotDiffReportOzone diffReport =
+          fetchReportPage(snapshot1, snapshot2, null, 0);
+      assertEquals(100, diffReport.getDiffList().size());
+    } else {
+      fail("Unexpected job status for the test.");
+    }
+  }
+
+  // Test snapshot diff when OM restarts in non-HA OM env and report is
+  // partially received.
+  @Test
+  public void testSnapshotDiffWhenOmRestartAndReportIsPartiallyFetched()
+      throws IOException, InterruptedException {
+    int pageSize = 10;
+    String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(5);
+    String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(5);
+    createSnapshots(snapshot1, snapshot2);
+
+    SnapshotDiffReportOzone diffReport = fetchReportPage(snapshot1, snapshot2,
+        null, pageSize);
+
+    List<DiffReportEntry> diffReportEntries = diffReport.getDiffList();
+    String nextToken = diffReport.getToken();
+
+    // Restart the OM and no need to wait because snapDiff job finished before
+    // the restart.
+    cluster.restartOzoneManager();
+    await().atMost(Duration.ofSeconds(120)).
+        until(() -> cluster.getOzoneManager().isRunning());
+
+    while (nextToken == null || StringUtils.isNotEmpty(nextToken)) {
+      diffReport = fetchReportPage(snapshot1, snapshot2, nextToken, pageSize);
+      diffReportEntries.addAll(diffReport.getDiffList());
+      nextToken = diffReport.getToken();
+    }
+    assertEquals(100, diffReportEntries.size());
+  }
+
+  private SnapshotDiffReportOzone fetchReportPage(String fromSnapshot,
+                                                  String toSnapshot,
+                                                  String token,
+                                                  int pageSize)
+      throws IOException, InterruptedException {
+
+    while (true) {
+      SnapshotDiffResponse response = store.snapshotDiff(volumeName, 
bucketName,
+          fromSnapshot, toSnapshot, token, pageSize, false);
+      if (response.getJobStatus() == IN_PROGRESS) {
+        Thread.sleep(response.getWaitTimeInMs());
+      } else if (response.getJobStatus() == DONE) {
+        return response.getSnapshotDiffReport();
+      } else {
+        fail("Unexpected job status for the test.");
+      }
+    }
+  }
+
+  private void createSnapshots(String snapshot1,
+                               String snapshot2) throws IOException {
+    createFileKey(ozoneBucket, "key");
+    store.createSnapshot(volumeName, bucketName, snapshot1);
+
+    for (int i = 0; i < 100; i++) {
+      createFileKey(ozoneBucket, "key-" + i);
+    }
+
+    store.createSnapshot(volumeName, bucketName, snapshot2);
+  }
+
   @Test
   public void testCompactionDagDisableForSnapshotMetadata() throws Exception {
     String snapshotName = createSnapshot(volumeName, bucketName);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 6b2c21d7d3..6c3e085866 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -158,6 +158,8 @@ public abstract class TestOzoneManagerHA {
     conf.setLong(
         OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
         SNAPSHOT_THRESHOLD);
+    // Enable filesystem snapshot feature for the test regardless of the 
default
+    conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
 
     // Some subclasses check RocksDB directly as part of their tests. These
     // depend on OBS layout.
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
index 19dfe91ea9..960633c32a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -40,10 +41,13 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
 import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -91,6 +95,98 @@ public class TestOzoneManagerHASnapshot {
     }
   }
 
+  // Test snapshot diff when OM restarts in HA OM env.
+  @Test
+  public void testSnapshotDiffWhenOmLeaderRestart()
+      throws Exception {
+    String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(10);
+    String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(10);
+
+    createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10));
+    store.createSnapshot(volumeName, bucketName, snapshot1);
+
+    for (int i = 0; i < 100; i++) {
+      createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10));
+    }
+
+    store.createSnapshot(volumeName, bucketName, snapshot2);
+
+    SnapshotDiffResponse response =
+        store.snapshotDiff(volumeName, bucketName,
+            snapshot1, snapshot2, null, 0, false);
+
+    assertEquals(IN_PROGRESS, response.getJobStatus());
+
+    String oldLeader = cluster.getOMLeader().getOMNodeId();
+
+    OzoneManager omLeader = cluster.getOMLeader();
+    cluster.shutdownOzoneManager(omLeader);
+    cluster.restartOzoneManager(omLeader, true);
+
+    await().atMost(Duration.ofSeconds(120))
+        .until(() -> cluster.getOMLeader() != null);
+
+    String newLeader = cluster.getOMLeader().getOMNodeId();
+
+    if (Objects.equals(oldLeader, newLeader)) {
+      // If old leader becomes leader again. Job should be done by this time.
+      response = store.snapshotDiff(volumeName, bucketName,
+          snapshot1, snapshot2, null, 0, false);
+      assertEquals(DONE, response.getJobStatus());
+      assertEquals(100, response.getSnapshotDiffReport().getDiffList().size());
+    } else {
+      // If new leader is different from old leader. SnapDiff request will be
+      // new to OM, and job status should be IN_PROGRESS.
+      response = store.snapshotDiff(volumeName, bucketName, snapshot1,
+          snapshot2, null, 0, false);
+      assertEquals(IN_PROGRESS, response.getJobStatus());
+      while (true) {
+        response = store.snapshotDiff(volumeName, bucketName, snapshot1,
+                snapshot2, null, 0, false);
+        if (DONE == response.getJobStatus()) {
+          assertEquals(100,
+              response.getSnapshotDiffReport().getDiffList().size());
+          break;
+        }
+        Thread.sleep(response.getWaitTimeInMs());
+      }
+    }
+  }
+
+  @Test
+  public void testSnapshotIdConsistency() throws Exception {
+    createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10));
+
+    String snapshotName = "snap-" + RandomStringUtils.randomNumeric(10);
+
+    store.createSnapshot(volumeName, bucketName, snapshotName);
+    List<OzoneManager> ozoneManagers = cluster.getOzoneManagersList();
+    List<String> snapshotIds = new ArrayList<>();
+
+    for (OzoneManager ozoneManager : ozoneManagers) {
+      await().atMost(Duration.ofSeconds(120))
+          .until(() -> {
+            SnapshotInfo snapshotInfo;
+            try {
+              snapshotInfo = ozoneManager.getMetadataManager()
+                  .getSnapshotInfoTable()
+                  .get(SnapshotInfo.getTableKey(volumeName,
+                      bucketName,
+                      snapshotName));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+
+            if (snapshotInfo != null) {
+              snapshotIds.add(snapshotInfo.getSnapshotID());
+            }
+            return snapshotInfo != null;
+          });
+    }
+
+    assertEquals(1, snapshotIds.stream().distinct().count());
+  }
+
   /**
    * Test snapshotNames are unique among OM nodes when snapshotName is not
    * passed or empty.
@@ -146,8 +242,8 @@ public class TestOzoneManagerHASnapshot {
     for (int i = 0; i < 100; i++) {
       int index = i % 10;
       createFileKey(ozoneBuckets.get(index),
-          "key-" + RandomStringUtils.randomNumeric(3));
-      String snapshot1 = "snapshot-" + RandomStringUtils.randomNumeric(5);
+          "key-" + RandomStringUtils.randomNumeric(10));
+      String snapshot1 = "snapshot-" + RandomStringUtils.randomNumeric(10);
       store.createSnapshot(volumeNames.get(index),
           bucketNames.get(index), snapshot1);
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 92ee5e5c63..eb5ea0f299 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -830,6 +830,12 @@ public final class OmSnapshotManager implements 
AutoCloseable {
 
   @Override
   public void close() {
+    if (snapshotDiffManager != null) {
+      snapshotDiffManager.close();
+    }
+    if (snapshotCache != null) {
+      snapshotCache.invalidateAll();
+    }
     if (snapshotDiffCleanupService != null) {
       snapshotDiffCleanupService.shutdown();
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 4719d06967..5e38ad33a2 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -152,7 +152,8 @@ public class SnapshotDiffManager implements AutoCloseable {
    * similar type of request at any point of time.
    */
   private final PersistentMap<String, SnapshotDiffJob> snapDiffJobTable;
-  private final ExecutorService executorService;
+  private final ExecutorService snapDiffExecutor;
+  private ExecutorService sstDumpToolExecutor;
 
   /**
    * Directory to keep hardlinks of SST files for a snapDiff job temporarily.
@@ -213,7 +214,7 @@ public class SnapshotDiffManager implements AutoCloseable {
         byte[].class,
         byte[].class);
 
-    this.executorService = new ThreadPoolExecutor(threadPoolSize,
+    this.snapDiffExecutor = new ThreadPoolExecutor(threadPoolSize,
         threadPoolSize,
         0,
         TimeUnit.MILLISECONDS,
@@ -264,13 +265,14 @@ public class SnapshotDiffManager implements AutoCloseable 
{
           OMConfigKeys
               .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT,
               StorageUnit.BYTES);
-      ExecutorService execService = new ThreadPoolExecutor(0,
+      sstDumpToolExecutor = new ThreadPoolExecutor(0,
               threadPoolSize, 60, TimeUnit.SECONDS,
               new SynchronousQueue<>(), new ThreadFactoryBuilder()
               .setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d")
               .build(),
               new ThreadPoolExecutor.DiscardPolicy());
-      return Optional.of(new ManagedSSTDumpTool(execService, bufferSize));
+      return Optional.of(new ManagedSSTDumpTool(sstDumpToolExecutor,
+          bufferSize));
     } catch (NativeLibraryNotLoadedException e) {
       return Optional.empty();
     }
@@ -560,7 +562,7 @@ public class SnapshotDiffManager implements AutoCloseable {
     // If executor cannot take any more job, remove the job form DB and return
     // the Rejected Job status with wait time.
     try {
-      executorService.execute(() -> generateSnapshotDiffReport(jobKey, jobId,
+      snapDiffExecutor.execute(() -> generateSnapshotDiffReport(jobKey, jobId,
           volumeName, bucketName, fromSnapshotName, toSnapshotName,
           forceFullDiff));
       updateJobStatus(jobKey, QUEUED, IN_PROGRESS);
@@ -1261,9 +1263,29 @@ public class SnapshotDiffManager implements 
AutoCloseable {
   }
 
   @Override
-  public void close() throws Exception {
+  public void close() {
+    if (snapDiffExecutor != null) {
+      closeExecutorService(snapDiffExecutor, "SnapDiffExecutor");
+    }
+    if (sstDumpToolExecutor != null) {
+      closeExecutorService(sstDumpToolExecutor, "SstDumpToolExecutor");
+    }
+  }
+
+  private void closeExecutorService(ExecutorService executorService,
+                                    String serviceName) {
     if (executorService != null) {
-      executorService.shutdown();
+      LOG.info("Shutting down executorService: '{}'", serviceName);
+      executorService.shutdownNow();
+      try {
+        if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+          executorService.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        // Re-interrupt the thread while catching InterruptedException
+        Thread.currentThread().interrupt();
+        executorService.shutdownNow();
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to