This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f21940c9e5 HDDS-8490. [Snapshot] Ability to cancel an in-progress
snapdiff job (#4819)
f21940c9e5 is described below
commit f21940c9e564e32a39ca5e73d50173e28c559da0
Author: Christos Bisias <[email protected]>
AuthorDate: Wed Jun 21 18:50:43 2023 +0300
HDDS-8490. [Snapshot] Ability to cancel an in-progress snapdiff job (#4819)
---
.../apache/hadoop/ozone/client/ObjectStore.java | 7 +-
.../ozone/client/protocol/ClientProtocol.java | 4 +-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 5 +-
.../ozone/om/protocol/OzoneManagerProtocol.java | 5 +-
...OzoneManagerProtocolClientSideTranslatorPB.java | 10 +-
.../ozone/snapshot/SnapshotDiffResponse.java | 70 ++++-
.../org/apache/hadoop/ozone/om/TestOmSnapshot.java | 159 +++++++++-
.../ozone/om/TestOzoneManagerHASnapshot.java | 8 +-
.../src/main/proto/OmClientProtocol.proto | 13 +
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 19 ++
.../org/apache/hadoop/ozone/om/OzoneManager.java | 13 +-
.../om/service/SnapshotDiffCleanupService.java | 15 +-
.../ozone/om/snapshot/SnapshotDiffManager.java | 199 +++++++++---
.../protocolPB/OzoneManagerRequestHandler.java | 6 +-
.../ozone/om/snapshot/TestSnapshotDiffManager.java | 333 ++++++++++++++++++++-
.../fs/ozone/BasicOzoneClientAdapterImpl.java | 2 +-
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 2 +-
.../hadoop/ozone/client/ClientProtocolStub.java | 3 +-
.../ozone/shell/snapshot/SnapshotDiffHandler.java | 11 +-
19 files changed, 788 insertions(+), 96 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
index b7cd9e676f..9ab17f9676 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
@@ -667,18 +667,21 @@ public class ObjectStore {
* @param token to get the index to return diff report from.
* @param pageSize maximum entries returned to the report.
* @param forceFullDiff request to force full diff, skipping DAG optimization
+ * @param cancel request to cancel a running snapshot diff job.
* @return the difference report between two snapshots
* @throws IOException in case of any exception while generating snapshot
diff
*/
+ @SuppressWarnings("parameternumber")
public SnapshotDiffResponse snapshotDiff(String volumeName,
String bucketName,
String fromSnapshot,
String toSnapshot,
String token,
int pageSize,
- boolean forceFullDiff)
+ boolean forceFullDiff,
+ boolean cancel)
throws IOException {
return proxy.snapshotDiff(volumeName, bucketName, fromSnapshot, toSnapshot,
- token, pageSize, forceFullDiff);
+ token, pageSize, forceFullDiff, cancel);
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 7567bcee5f..b7bba4c2cd 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -1067,13 +1067,15 @@ public interface ClientProtocol {
* @param token to get the index to return diff report from.
* @param pageSize maximum entries returned to the report.
* @param forceFullDiff request to force full diff, skipping DAG optimization
+ * @param cancel request to cancel a running snapshot diff job.
* @return the difference report between two snapshots
* @throws IOException in case of any exception while generating snapshot
diff
*/
+ @SuppressWarnings("parameternumber")
SnapshotDiffResponse snapshotDiff(String volumeName, String bucketName,
String fromSnapshot, String toSnapshot,
String token, int pageSize,
- boolean forceFullDiff)
+ boolean forceFullDiff, boolean cancel)
throws IOException;
/**
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index ccb53c255a..f8fd36d465 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -985,14 +985,15 @@ public class RpcClient implements ClientProtocol {
String toSnapshot,
String token,
int pageSize,
- boolean forceFullDiff)
+ boolean forceFullDiff,
+ boolean cancel)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotBlank(volumeName),
"volume can't be null or empty.");
Preconditions.checkArgument(StringUtils.isNotBlank(bucketName),
"bucket can't be null or empty.");
return ozoneManagerClient.snapshotDiff(volumeName, bucketName,
- fromSnapshot, toSnapshot, token, pageSize, forceFullDiff);
+ fromSnapshot, toSnapshot, token, pageSize, forceFullDiff, cancel);
}
/**
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index e28bf314b0..e4e38b1377 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -714,16 +714,19 @@ public interface OzoneManagerProtocol
* @param token to get the index to return diff report from.
* @param pageSize maximum entries returned to the report.
* @param forceFullDiff request to force full diff, skipping DAG optimization
+ * @param cancel request to cancel a running snapshot diff job.
* @return the difference report between two snapshots
* @throws IOException in case of any exception while generating snapshot
diff
*/
+ @SuppressWarnings("parameternumber")
default SnapshotDiffResponse snapshotDiff(String volumeName,
String bucketName,
String fromSnapshot,
String toSnapshot,
String token,
int pageSize,
- boolean forceFullDiff)
+ boolean forceFullDiff,
+ boolean cancel)
throws IOException {
throw new UnsupportedOperationException("OzoneManager does not require " +
"this to be implemented");
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 2d623470c9..b789988dcc 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -198,6 +198,7 @@ import
org.apache.hadoop.ozone.security.proto.SecurityProtos.RenewDelegationToke
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
import org.apache.hadoop.security.token.Token;
@@ -1215,7 +1216,8 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
String toSnapshot,
String token,
int pageSize,
- boolean forceFullDiff)
+ boolean forceFullDiff,
+ boolean cancel)
throws IOException {
final OzoneManagerProtocolProtos.SnapshotDiffRequest.Builder
requestBuilder =
@@ -1225,7 +1227,8 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
.setFromSnapshot(fromSnapshot)
.setToSnapshot(toSnapshot)
.setPageSize(pageSize)
- .setForceFullDiff(forceFullDiff);
+ .setForceFullDiff(forceFullDiff)
+ .setCancel(cancel);
if (!StringUtils.isBlank(token)) {
requestBuilder.setToken(token);
@@ -1242,7 +1245,8 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
return new SnapshotDiffResponse(SnapshotDiffReportOzone.fromProtobuf(
diffResponse.getSnapshotDiffReport()),
JobStatus.fromProtobuf(diffResponse.getJobStatus()),
- diffResponse.getWaitTimeInMs());
+ diffResponse.getWaitTimeInMs(),
+ JobCancelResult.fromProtobuf(diffResponse.getJobCancelResult()));
}
/**
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java
index 6cd9bff039..f79ad36eeb 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffResponse.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.snapshot;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffResponse.JobStatusProto;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffResponse.JobCancelResultProto;
/**
* POJO for Snapshot Diff Response.
@@ -32,7 +33,8 @@ public class SnapshotDiffResponse {
IN_PROGRESS,
DONE,
REJECTED,
- FAILED;
+ FAILED,
+ CANCELLED;
public JobStatusProto toProtobuf() {
return JobStatusProto.valueOf(this.name());
@@ -43,9 +45,41 @@ public class SnapshotDiffResponse {
}
}
+ /**
+ * Snapshot diff cancel result enum.
+ */
+ public enum JobCancelResult {
+ JOB_NOT_CANCELLED("Job hasn't been cancelled"),
+ NEW_JOB("Cannot cancel a newly submitted job"),
+ JOB_DONE("Job is already DONE"),
+ INVALID_STATUS_TRANSITION("Job is not IN_PROGRESS, cancel failed"),
+ JOB_ALREADY_CANCELLED("Job has already been cancelled"),
+ CANCELLATION_SUCCESS("Job has successfully been cancelled");
+
+ private final String description;
+
+ JobCancelResult(String description) {
+ this.description = description;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public JobCancelResultProto toProtobuf() {
+ return JobCancelResultProto.valueOf(this.name());
+ }
+
+ public static JobCancelResult fromProtobuf(
+ JobCancelResultProto jobCancelResultProto) {
+ return JobCancelResult.valueOf(jobCancelResultProto.name());
+ }
+ }
+
private final SnapshotDiffReportOzone snapshotDiffReport;
private final JobStatus jobStatus;
private final long waitTimeInMs;
+ private final JobCancelResult jobCancelResult;
public SnapshotDiffResponse(final SnapshotDiffReportOzone snapshotDiffReport,
final JobStatus jobStatus,
@@ -53,6 +87,17 @@ public class SnapshotDiffResponse {
this.snapshotDiffReport = snapshotDiffReport;
this.jobStatus = jobStatus;
this.waitTimeInMs = waitTimeInMs;
+ this.jobCancelResult = JobCancelResult.JOB_NOT_CANCELLED;
+ }
+
+ public SnapshotDiffResponse(final SnapshotDiffReportOzone snapshotDiffReport,
+ final JobStatus jobStatus,
+ final long waitTimeInMs,
+ final JobCancelResult jobCancelResult) {
+ this.snapshotDiffReport = snapshotDiffReport;
+ this.jobStatus = jobStatus;
+ this.waitTimeInMs = waitTimeInMs;
+ this.jobCancelResult = jobCancelResult;
}
public SnapshotDiffReportOzone getSnapshotDiffReport() {
@@ -67,18 +112,27 @@ public class SnapshotDiffResponse {
return waitTimeInMs;
}
+ public JobCancelResult getJobCancelResult() {
+ return jobCancelResult;
+ }
+
@Override
public String toString() {
StringBuilder str = new StringBuilder();
- if (jobStatus == JobStatus.DONE) {
- str.append(snapshotDiffReport.toString());
+ if (jobCancelResult == JobCancelResult.JOB_NOT_CANCELLED ||
+ jobCancelResult == JobCancelResult.CANCELLATION_SUCCESS) {
+ if (jobStatus == JobStatus.DONE) {
+ str.append(snapshotDiffReport.toString());
+ } else {
+ str.append("Snapshot diff job is ");
+ str.append(jobStatus);
+ str.append(". Please retry after ");
+ str.append(waitTimeInMs);
+ str.append(" ms.\n");
+ }
} else {
- str.append("Snapshot diff job is ");
- str.append(jobStatus);
+ str.append(jobCancelResult.getDescription());
str.append("\n");
- str.append("Please retry after ");
- str.append(waitTimeInMs);
- str.append(" ms.");
}
return str.toString();
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
index 6160f52350..32278636fe 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.service.SnapshotDiffCleanupService;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
@@ -86,12 +87,19 @@ import static
org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.CONTAINS_SNAPSHOT;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION_PRIOR_FINALIZATION;
import static
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult.CANCELLATION_SUCCESS;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult.JOB_ALREADY_CANCELLED;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult.JOB_NOT_CANCELLED;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult.NEW_JOB;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.CANCELLED;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED;
import static org.awaitility.Awaitility.with;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
import static org.awaitility.Awaitility.await;
@@ -234,7 +242,7 @@ public class TestOmSnapshot {
store.snapshotDiff(volumeName, bucketName,
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
- "", 1000, false));
+ "", 1000, false, false));
expectFailurePreFinalization(() ->
store.deleteSnapshot(volumeName, bucketName,
UUID.randomUUID().toString()));
@@ -631,6 +639,127 @@ public class TestOmSnapshot {
}
+ @Test
+ public void testSnapDiffCancel() throws Exception {
+ // Create key1 and take snapshot.
+ String key1 = "key-1-" + RandomStringUtils.randomNumeric(5);
+ createFileKey(ozoneBucket, key1);
+ String fromSnapName = "snap-1-" + RandomStringUtils.randomNumeric(5);
+ createSnapshot(volumeName, bucketName, fromSnapName);
+
+ // Create key2 and take snapshot.
+ String key2 = "key-2-" + RandomStringUtils.randomNumeric(5);
+ createFileKey(ozoneBucket, key2);
+ String toSnapName = "snap-2-" + RandomStringUtils.randomNumeric(5);
+ createSnapshot(volumeName, bucketName, toSnapName);
+
+ SnapshotDiffResponse response = store.snapshotDiff(
+ volumeName, bucketName, fromSnapName, toSnapName,
+ null, 0, false, false);
+
+ assertEquals(IN_PROGRESS, response.getJobStatus());
+
+ response = store.snapshotDiff(volumeName,
+ bucketName, fromSnapName, toSnapName,
+ null, 0, false, true);
+
+ // Job status should be updated to CANCELLED.
+ assertEquals(CANCELLED, response.getJobStatus());
+
+ // Executing the command again should return the
+ // JOB_ALREADY_CANCELLED JobCancelResult,
+ // but the job status should still be CANCELLED
+ // until the job is picked up by the SnapshotDiffCleanupService
+ // and removed from the snapDiffJobTable.
+ response = store.snapshotDiff(volumeName,
+ bucketName, fromSnapName, toSnapName,
+ null, 0, false, true);
+ assertEquals(CANCELLED, response.getJobStatus());
+ assertEquals(JOB_ALREADY_CANCELLED, response.getJobCancelResult());
+
+ String fromSnapshotTableKey = SnapshotInfo
+ .getTableKey(volumeName, bucketName, fromSnapName);
+ String toSnapshotTableKey = SnapshotInfo
+ .getTableKey(volumeName, bucketName, toSnapName);
+
+ UUID fromSnapshotID = ozoneManager.getOmSnapshotManager()
+ .getSnapshotInfo(fromSnapshotTableKey).getSnapshotId();
+ UUID toSnapshotID = ozoneManager.getOmSnapshotManager()
+ .getSnapshotInfo(toSnapshotTableKey).getSnapshotId();
+
+ // Construct SnapshotDiffJob table key.
+ String snapDiffJobKey = fromSnapshotID + DELIMITER + toSnapshotID;
+
+ // Get the job from the SnapDiffJobTable, in order to get its ID.
+ String jobID = ozoneManager.getOmSnapshotManager()
+ .getSnapshotDiffManager().getSnapDiffJobTable()
+ .get(snapDiffJobKey).getJobId();
+
+ SnapshotDiffCleanupService snapshotDiffCleanupService =
+ ozoneManager.getOmSnapshotManager().getSnapshotDiffCleanupService();
+
+ // Run SnapshotDiffCleanupService.
+ snapshotDiffCleanupService.run();
+ // Verify that after running the cleanup service,
+ // job exists in the purged job table.
+
assertNotNull(snapshotDiffCleanupService.getEntryFromPurgedJobTable(jobID));
+ }
+
+ @Test
+ public void testSnapDiffCancelFailureResponses() throws Exception {
+ // Create key1 and take snapshot.
+ String key1 = "key-1-" + RandomStringUtils.randomNumeric(5);
+ createFileKey(ozoneBucket, key1);
+ String fromSnapName = "snap-1-" + RandomStringUtils.randomNumeric(5);
+ createSnapshot(volumeName, bucketName, fromSnapName);
+
+ // Create key2 and take snapshot.
+ String key2 = "key-2-" + RandomStringUtils.randomNumeric(5);
+ createFileKey(ozoneBucket, key2);
+ String toSnapName = "snap-2-" + RandomStringUtils.randomNumeric(5);
+ createSnapshot(volumeName, bucketName, toSnapName);
+
+ // New job that doesn't exist, cancel fails.
+ SnapshotDiffResponse response = store.snapshotDiff(
+ volumeName, bucketName, fromSnapName, toSnapName,
+ null, 0, false, true);
+
+ Assert.assertEquals(NEW_JOB, response.getJobCancelResult());
+ Assert.assertTrue(response.toString()
+ .contains(NEW_JOB.getDescription()));
+ Assert.assertEquals(QUEUED, response.getJobStatus());
+
+ // Submit new job.
+ response = store.snapshotDiff(volumeName,
+ bucketName, fromSnapName, toSnapName,
+ null, 0, false, false);
+
+ Assert.assertEquals(JOB_NOT_CANCELLED, response.getJobCancelResult());
+ Assert.assertEquals(IN_PROGRESS, response.getJobStatus());
+ Assert.assertTrue(response.toString()
+ .contains("Snapshot diff job is " + IN_PROGRESS));
+
+ // Cancel success.
+ response = store.snapshotDiff(volumeName,
+ bucketName, fromSnapName, toSnapName,
+ null, 0, false, true);
+
+ Assert.assertEquals(CANCELLATION_SUCCESS, response.getJobCancelResult());
+ Assert.assertEquals(CANCELLED, response.getJobStatus());
+ Assert.assertTrue(response.toString()
+ .contains("Snapshot diff job is " + CANCELLED));
+
+ // Job already cancelled.
+ response = store.snapshotDiff(volumeName,
+ bucketName, fromSnapName, toSnapName,
+ null, 0, false, true);
+
+ Assert.assertEquals(JOB_ALREADY_CANCELLED, response.getJobCancelResult());
+ Assert.assertEquals(CANCELLED, response.getJobStatus());
+ Assert.assertTrue(response.toString()
+ .contains(JOB_ALREADY_CANCELLED.getDescription()));
+ }
+
private SnapshotDiffReportOzone getSnapDiffReport(String volume,
String bucket,
String fromSnapshot,
@@ -639,7 +768,7 @@ public class TestOmSnapshot {
SnapshotDiffResponse response;
do {
response = store.snapshotDiff(volume, bucket, fromSnapshot,
- toSnapshot, null, 0, false);
+ toSnapshot, null, 0, false, false);
Thread.sleep(response.getWaitTimeInMs());
} while (response.getJobStatus() != DONE);
@@ -664,12 +793,12 @@ public class TestOmSnapshot {
LambdaTestUtils.intercept(OMException.class,
"KEY_NOT_FOUND",
() -> store.snapshotDiff(volume, bucket, snap1, snap2,
- null, 0, false));
+ null, 0, false, false));
// From snapshot is invalid
LambdaTestUtils.intercept(OMException.class,
"KEY_NOT_FOUND",
() -> store.snapshotDiff(volume, bucket, snap2, snap1,
- null, 0, false));
+ null, 0, false, false));
}
@Test
@@ -695,17 +824,17 @@ public class TestOmSnapshot {
LambdaTestUtils.intercept(OMException.class,
"KEY_NOT_FOUND",
() -> store.snapshotDiff(volumea, bucketb, snap1, snap2,
- null, 0, false));
+ null, 0, false, false));
// Volume is nonexistent
LambdaTestUtils.intercept(OMException.class,
"KEY_NOT_FOUND",
() -> store.snapshotDiff(volumeb, bucketa, snap2, snap1,
- null, 0, false));
+ null, 0, false, false));
// Both volume and bucket are nonexistent
LambdaTestUtils.intercept(OMException.class,
"KEY_NOT_FOUND",
() -> store.snapshotDiff(volumeb, bucketb, snap2, snap1,
- null, 0, false));
+ null, 0, false, false));
}
@Test
@@ -728,20 +857,20 @@ public class TestOmSnapshot {
LambdaTestUtils.intercept(OMException.class,
"KEY_NOT_FOUND",
() -> store.snapshotDiff(volume, bucket, snap1, nullstr,
- null, 0, false));
+ null, 0, false, false));
// From snapshot is empty
LambdaTestUtils.intercept(OMException.class,
"KEY_NOT_FOUND",
() -> store.snapshotDiff(volume, bucket, nullstr, snap1,
- null, 0, false));
+ null, 0, false, false));
// Bucket is empty
assertThrows(IllegalArgumentException.class,
() -> store.snapshotDiff(volume, nullstr, snap1, snap2,
- null, 0, false));
+ null, 0, false, false));
// Volume is empty
assertThrows(IllegalArgumentException.class,
() -> store.snapshotDiff(nullstr, bucket, snap1, snap2,
- null, 0, false));
+ null, 0, false, false));
}
@Test
@@ -815,7 +944,7 @@ public class TestOmSnapshot {
Assert.assertEquals(4, getKeyTableSstFiles().size());
SnapshotDiffReportOzone diff1 =
store.snapshotDiff(volumeName1, bucketName1, snap1, snap2,
- null, 0, false)
+ null, 0, false, false)
.getSnapshotDiffReport();
Assert.assertEquals(1, diff1.getDiffList().size());
}
@@ -976,7 +1105,7 @@ public class TestOmSnapshot {
createSnapshots(snapshot1, snapshot2);
SnapshotDiffResponse response = store.snapshotDiff(volumeName, bucketName,
- snapshot1, snapshot2, null, 0, false);
+ snapshot1, snapshot2, null, 0, false, false);
assertEquals(IN_PROGRESS, response.getJobStatus());
@@ -987,7 +1116,7 @@ public class TestOmSnapshot {
until(() -> cluster.getOzoneManager().isRunning());
response = store.snapshotDiff(volumeName, bucketName,
- snapshot1, snapshot2, null, 0, false);
+ snapshot1, snapshot2, null, 0, false, false);
// If job was IN_PROGRESS or DONE state when OM restarted, it should be
// DONE by this time.
@@ -1045,7 +1174,7 @@ public class TestOmSnapshot {
while (true) {
SnapshotDiffResponse response = store.snapshotDiff(volumeName,
bucketName,
- fromSnapshot, toSnapshot, token, pageSize, false);
+ fromSnapshot, toSnapshot, token, pageSize, false, false);
if (response.getJobStatus() == IN_PROGRESS) {
Thread.sleep(response.getWaitTimeInMs());
} else if (response.getJobStatus() == DONE) {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
index b294707e2f..7f071f1137 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java
@@ -113,7 +113,7 @@ public class TestOzoneManagerHASnapshot {
SnapshotDiffResponse response =
store.snapshotDiff(volumeName, bucketName,
- snapshot1, snapshot2, null, 0, false);
+ snapshot1, snapshot2, null, 0, false, false);
assertEquals(IN_PROGRESS, response.getJobStatus());
@@ -131,18 +131,18 @@ public class TestOzoneManagerHASnapshot {
if (Objects.equals(oldLeader, newLeader)) {
// If old leader becomes leader again. Job should be done by this time.
response = store.snapshotDiff(volumeName, bucketName,
- snapshot1, snapshot2, null, 0, false);
+ snapshot1, snapshot2, null, 0, false, false);
assertEquals(DONE, response.getJobStatus());
assertEquals(100, response.getSnapshotDiffReport().getDiffList().size());
} else {
// If new leader is different from old leader. SnapDiff request will be
// new to OM, and job status should be IN_PROGRESS.
response = store.snapshotDiff(volumeName, bucketName, snapshot1,
- snapshot2, null, 0, false);
+ snapshot2, null, 0, false, false);
assertEquals(IN_PROGRESS, response.getJobStatus());
while (true) {
response = store.snapshotDiff(volumeName, bucketName, snapshot1,
- snapshot2, null, 0, false);
+ snapshot2, null, 0, false, false);
if (DONE == response.getJobStatus()) {
assertEquals(100,
response.getSnapshotDiffReport().getDiffList().size());
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index ea0f9a9b02..d577c557fb 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1722,6 +1722,7 @@ message SnapshotDiffRequest {
optional string token = 5;
optional uint32 pageSize = 6;
optional bool forceFullDiff = 7;
+ optional bool cancel = 8;
}
message DeleteSnapshotRequest {
@@ -1800,10 +1801,22 @@ message SnapshotDiffResponse {
DONE = 3;
REJECTED = 4;
FAILED = 5;
+ CANCELLED = 6;
}
+
+ enum JobCancelResultProto {
+ JOB_NOT_CANCELLED = 1;
+ NEW_JOB = 2;
+ JOB_DONE = 3;
+ INVALID_STATUS_TRANSITION = 4;
+ JOB_ALREADY_CANCELLED = 5;
+ CANCELLATION_SUCCESS = 6;
+ }
+
optional SnapshotDiffReportProto snapshotDiffReport = 1;
optional JobStatusProto jobStatus = 2;
optional int64 waitTimeInMs = 3;
+ optional JobCancelResultProto jobCancelResult = 4;
}
message DeleteSnapshotResponse {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index cdd39ead34..09afdcb45e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -489,6 +489,16 @@ public final class OmSnapshotManager implements
AutoCloseable {
return OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX;
}
+ @VisibleForTesting
+ public SnapshotDiffManager getSnapshotDiffManager() {
+ return snapshotDiffManager;
+ }
+
+ @VisibleForTesting
+ public SnapshotDiffCleanupService getSnapshotDiffCleanupService() {
+ return snapshotDiffCleanupService;
+ }
+
/**
* Helper method to locate the end key with the given prefix and iterator.
* @param keyIter TableIterator
@@ -652,6 +662,15 @@ public final class OmSnapshotManager implements
AutoCloseable {
(keyParts[0].compareTo(OM_SNAPSHOT_INDICATOR) == 0);
}
+ public SnapshotDiffResponse cancelSnapshotDiff(final String volume,
+ final String bucket,
+ final String fromSnapshot,
+ final String toSnapshot)
+ throws IOException {
+ return snapshotDiffManager.cancelSnapshotDiff(volume,
+ bucket, fromSnapshot, toSnapshot);
+ }
+
public SnapshotDiffResponse getSnapshotDiffReport(final String volume,
final String bucket,
final String fromSnapshot,
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 4cd062eb92..c33158293c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -4505,16 +4505,23 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
ozoneObj.getKeyName(), false);
}
+ @SuppressWarnings("parameternumber")
public SnapshotDiffResponse snapshotDiff(String volume,
String bucket,
String fromSnapshot,
String toSnapshot,
String token,
int pageSize,
- boolean forceFullDiff)
+ boolean forceFullDiff,
+ boolean cancel)
throws IOException {
- return omSnapshotManager.getSnapshotDiffReport(volume, bucket,
- fromSnapshot, toSnapshot, token, pageSize, forceFullDiff);
+ if (cancel) {
+ return omSnapshotManager.cancelSnapshotDiff(volume, bucket,
+ fromSnapshot, toSnapshot);
+ } else {
+ return omSnapshotManager.getSnapshotDiffReport(volume, bucket,
+ fromSnapshot, toSnapshot, token, pageSize, forceFullDiff);
+ }
}
private String reconfOzoneAdmins(String newVal) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
index dd8a3ad4ef..6a84cc4623 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
@@ -43,6 +43,7 @@ import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_JOB
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_MAX_JOBS_PURGE_PER_TASK;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_MAX_JOBS_PURGE_PER_TASK_DEFAULT;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.CANCELLED;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.FAILED;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED;
@@ -124,6 +125,17 @@ public class SnapshotDiffCleanupService extends
BackgroundService {
moveOldSnapDiffJobsToPurgeTable();
}
+ @VisibleForTesting
+ public byte[] getEntryFromPurgedJobTable(String jobId) {
+ try {
+ return db.get().get(snapDiffPurgedJobCfh,
+ codecRegistry.asRawData(jobId));
+ } catch (IOException | RocksDBException e) {
+ // TODO: [SNAPSHOT] Fail gracefully.
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Move the snapDiff jobs from snapDiffJobTable to purge table which are
* older than the allowed time or have FAILED or REJECTED status.
@@ -154,7 +166,8 @@ public class SnapshotDiffCleanupService extends
BackgroundService {
if (currentTimeMillis - snapDiffJob.getCreationTime() > maxAllowedTime
|| snapDiffJob.getStatus() == FAILED
- || snapDiffJob.getStatus() == REJECTED) {
+ || snapDiffJob.getStatus() == REJECTED
+ || snapDiffJob.getStatus() == CANCELLED) {
writeBatch.put(snapDiffPurgedJobCfh,
codecRegistry.asRawData(snapDiffJob.getJobId()),
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index d01d8fb60e..023e5311d8 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@@ -74,6 +75,7 @@ import
org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
import org.apache.hadoop.util.ClosableIterator;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult;
import org.apache.ozone.rocksdb.util.ManagedSstFileReader;
import org.apache.ozone.rocksdb.util.RdbUtil;
import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
@@ -103,6 +105,7 @@ import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotAct
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.dropColumnFamilyHandle;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getSnapshotInfo;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.CANCELLED;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.FAILED;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
@@ -247,6 +250,11 @@ public class SnapshotDiffManager implements AutoCloseable {
this.loadJobsOnStartUp();
}
+ @VisibleForTesting
+ public PersistentMap<String, SnapshotDiffJob> getSnapDiffJobTable() {
+ return snapDiffJobTable;
+ }
+
private Optional<ManagedSSTDumpTool> initSSTDumpTool(
final OzoneConfiguration conf) {
try {
@@ -367,6 +375,58 @@ public class SnapshotDiffManager implements AutoCloseable {
.getPath(), tablesToLookUp);
}
+ public SnapshotDiffResponse cancelSnapshotDiff(
+ final String volumeName,
+ final String bucketName,
+ final String fromSnapshotName,
+ final String toSnapshotName) throws IOException {
+ SnapshotInfo fsInfo = getSnapshotInfo(ozoneManager,
+ volumeName, bucketName, fromSnapshotName);
+ SnapshotInfo tsInfo = getSnapshotInfo(ozoneManager,
+ volumeName, bucketName, toSnapshotName);
+
+ String snapDiffJobKey = fsInfo.getSnapshotId() + DELIMITER +
+ tsInfo.getSnapshotId();
+ SnapshotDiffJob diffJob = snapDiffJobTable.get(snapDiffJobKey);
+
+ JobStatus jobStatus;
+ JobCancelResult jobCancelResult;
+
+ if (diffJob == null) {
+ jobCancelResult = JobCancelResult.NEW_JOB;
+ // JobStatus is needed to send a response back to the client.
+ // JobStatus can't be null, so set it to QUEUED.
+ // This won't be printed as part of the response
+ // and the job doesn't exist in the SnapDiffJob table,
+ // so submitting again the job, won't make any difference.
+ jobStatus = QUEUED;
+ } else {
+ if (Objects.equals(diffJob.getStatus(), IN_PROGRESS)) {
+ updateJobStatus(snapDiffJobKey, IN_PROGRESS, CANCELLED);
+ jobCancelResult = JobCancelResult.CANCELLATION_SUCCESS;
+ } else if (Objects.equals(diffJob.getStatus(), CANCELLED)) {
+ jobCancelResult = JobCancelResult.JOB_ALREADY_CANCELLED;
+ } else if (Objects.equals(diffJob.getStatus(), DONE)) {
+ jobCancelResult = JobCancelResult.JOB_DONE;
+ } else {
+ jobCancelResult = JobCancelResult.INVALID_STATUS_TRANSITION;
+ }
+ // Get again the status from the table
+ // in case it's updated to CANCELLED.
+ jobStatus = snapDiffJobTable.get(snapDiffJobKey).getStatus();
+ }
+
+ OFSPath snapshotRoot = getSnapshotRootPath(volumeName, bucketName);
+ SnapshotDiffReportOzone report = new SnapshotDiffReportOzone(
+ snapshotRoot.toString(), volumeName, bucketName,
+ fromSnapshotName, toSnapshotName, new ArrayList<>(), null);
+
+ // If cancel is a success then return SnapshotDiffReport.
+ // It will check the table and get that the job is cancelled,
+ // and return the appropriate response.
+ return new SnapshotDiffResponse(report, jobStatus, 0L, jobCancelResult);
+ }
+
public SnapshotDiffResponse getSnapshotDiffReport(
final String volumeName,
final String bucketName,
@@ -418,6 +478,12 @@ public class SnapshotDiffManager implements AutoCloseable {
bucketName, fromSnapshotName, toSnapshotName, new ArrayList<>(),
null),
REJECTED, defaultWaitTime);
+ case CANCELLED:
+ return new SnapshotDiffResponse(
+ new SnapshotDiffReportOzone(snapshotRoot.toString(), volumeName,
+ bucketName, fromSnapshotName, toSnapshotName, new ArrayList<>(),
+ null),
+ CANCELLED, 0L, JobCancelResult.CANCELLATION_SUCCESS);
default:
throw new IllegalStateException("Unknown snapshot job status: " +
snapDiffJob.getStatus());
@@ -507,11 +573,11 @@ public class SnapshotDiffManager implements AutoCloseable
{
OFSPath snapshotRoot = getSnapshotRootPath(volume, bucket);
- // This is only possible if another thread tired to submit the request,
+ // This is only possible if another thread tried to submit the request,
// and it got rejected. In this scenario, return the Rejected job status
// with wait time.
if (snapDiffJob == null) {
- LOG.info("Snap diff job has been removed for for volume: {}, " +
+ LOG.info("Snap diff job has been removed for volume: {}, " +
"bucket: {}, fromSnapshot: {} and toSnapshot: {}.",
volume, bucket, fromSnapshot, toSnapshot);
return new SnapshotDiffResponse(
@@ -623,20 +689,29 @@ public class SnapshotDiffManager implements AutoCloseable
{
return snapDiffJob;
}
- private void validateSnapshotsAreActive(final String volumeName,
- final String bucketName,
- final String fromSnapshotName,
- final String toSnapshotName)
+ private boolean areDiffJobAndSnapshotsActive(
+ final String volumeName, final String bucketName,
+ final String fromSnapshotName, final String toSnapshotName)
throws IOException {
SnapshotInfo fromSnapInfo = getSnapshotInfo(ozoneManager, volumeName,
bucketName, fromSnapshotName);
SnapshotInfo toSnapInfo = getSnapshotInfo(ozoneManager, volumeName,
bucketName, toSnapshotName);
+ String jobKey = fromSnapInfo.getSnapshotId() +
+ DELIMITER + toSnapInfo.getSnapshotId();
+
+ if (snapDiffJobTable.get(jobKey).getStatus()
+ .equals(CANCELLED)) {
+ return false;
+ }
checkSnapshotActive(fromSnapInfo, false);
checkSnapshotActive(toSnapInfo, false);
+
+ return true;
}
+ @SuppressWarnings("methodlength")
private void generateSnapshotDiffReport(final String jobKey,
final String jobId,
final String volumeName,
@@ -659,8 +734,10 @@ public class SnapshotDiffManager implements AutoCloseable {
Path path = Paths.get(sstBackupDirForSnapDiffJobs + "/" + jobId);
try {
- validateSnapshotsAreActive(volumeName, bucketName, fromSnapshotName,
- toSnapshotName);
+ if (!areDiffJobAndSnapshotsActive(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName)) {
+ return;
+ }
String fsKey = getTableKey(volumeName, bucketName, fromSnapshotName);
String tsKey = getTableKey(volumeName, bucketName, toSnapshotName);
@@ -710,43 +787,67 @@ public class SnapshotDiffManager implements AutoCloseable
{
boolean useFullDiff = snapshotForceFullDiff || forceFullDiff;
- validateSnapshotsAreActive(volumeName, bucketName, fromSnapshotName,
- toSnapshotName);
+ if (!areDiffJobAndSnapshotsActive(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName)) {
+ return;
+ }
Table<String, OmKeyInfo> fsKeyTable = fromSnapshot.getMetadataManager()
.getKeyTable(bucketLayout);
Table<String, OmKeyInfo> tsKeyTable = toSnapshot.getMetadataManager()
.getKeyTable(bucketLayout);
- getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsKeyTable, tsKeyTable,
- fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff,
- tablePrefixes, objectIdToKeyNameMapForFromSnapshot,
- objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
- path.toString());
-
- if (bucketLayout.isFileSystemOptimized()) {
- validateSnapshotsAreActive(volumeName, bucketName, fromSnapshotName,
- toSnapshotName);
-
- Table<String, OmDirectoryInfo> fsDirTable =
- fromSnapshot.getMetadataManager().getDirectoryTable();
- Table<String, OmDirectoryInfo> tsDirTable =
- toSnapshot.getMetadataManager().getDirectoryTable();
-
- getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsDirTable, tsDirTable,
- fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff,
- tablePrefixes, objectIdToKeyNameMapForFromSnapshot,
- objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
- path.toString());
- }
-
- validateSnapshotsAreActive(volumeName, bucketName, fromSnapshotName,
- toSnapshotName);
- long totalDiffEntries = generateDiffReport(jobId,
- objectIDsToCheckMap,
- objectIdToKeyNameMapForFromSnapshot,
- objectIdToKeyNameMapForToSnapshot);
+ // These are the most time and resource consuming method calls.
+ // Split the calls into steps and store them in an array, to avoid
+ // repetition while constantly checking if the job is cancelled.
+ Callable<Void>[] methodCalls = new Callable[]{
+ () -> {
+ getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsKeyTable, tsKeyTable,
+ fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff,
+ tablePrefixes, objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+ path.toString());
+ return null;
+ },
+ () -> {
+ if (bucketLayout.isFileSystemOptimized()) {
+ Table<String, OmDirectoryInfo> fsDirTable =
+ fromSnapshot.getMetadataManager().getDirectoryTable();
+ Table<String, OmDirectoryInfo> tsDirTable =
+ toSnapshot.getMetadataManager().getDirectoryTable();
+
+ getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsDirTable,
tsDirTable,
+ fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff,
+ tablePrefixes, objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot, objectIDsToCheckMap,
+ path.toString());
+ }
+ return null;
+ },
+ () -> {
+ long totalDiffEntries = generateDiffReport(jobId,
+ objectIDsToCheckMap,
+ objectIdToKeyNameMapForFromSnapshot,
+ objectIdToKeyNameMapForToSnapshot,
+ volumeName, bucketName,
+ fromSnapshotName, toSnapshotName);
+ // If job is cancelled, totalDiffEntries will be equal to -1.
+ if (totalDiffEntries >= 0 &&
+ areDiffJobAndSnapshotsActive(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName)) {
+ updateJobStatusToDone(jobKey, totalDiffEntries);
+ }
+ return null;
+ }
+ };
- updateJobStatusToDone(jobKey, totalDiffEntries);
+ // Check if the job is cancelled, before every method call.
+ for (Callable<Void> methodCall : methodCalls) {
+ if (!areDiffJobAndSnapshotsActive(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName)) {
+ return;
+ }
+ methodCall.call();
+ }
} catch (ExecutionException | IOException | RocksDBException exception) {
updateJobStatus(jobKey, IN_PROGRESS, FAILED);
LOG.error("Caught checked exception during diff report generation for " +
@@ -963,12 +1064,16 @@ public class SnapshotDiffManager implements
AutoCloseable {
}
}
-
+ @SuppressWarnings("checkstyle:ParameterNumber")
long generateDiffReport(final String jobId,
final PersistentSet<byte[]> objectIDsToCheck,
final PersistentMap<byte[], byte[]> oldObjIdToKeyMap,
final PersistentMap<byte[], byte[]>
- newObjIdToKeyMap) {
+ newObjIdToKeyMap,
+ final String volumeName,
+ final String bucketName,
+ final String fromSnapshotName,
+ final String toSnapshotName) {
LOG.debug("Starting diff report generation for jobId: {}.", jobId);
ColumnFamilyHandle deleteDiffColumnFamily = null;
ColumnFamilyHandle renameDiffColumnFamily = null;
@@ -999,7 +1104,17 @@ public class SnapshotDiffManager implements AutoCloseable
{
try (ClosableIterator<byte[]>
objectIdsIterator = objectIDsToCheck.iterator()) {
+ // This counter is used, so that we can check every 100 elements
+ // if the job is cancelled and snapshots are still active.
+ int counter = 0;
+
while (objectIdsIterator.hasNext()) {
+ if (counter % 100 == 0 &&
+ !areDiffJobAndSnapshotsActive(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName)) {
+ return -1L;
+ }
+
byte[] id = objectIdsIterator.next();
/*
* This key can be
@@ -1047,6 +1162,8 @@ public class SnapshotDiffManager implements AutoCloseable
{
SnapshotDiffReportOzone.getDiffReportEntry(DiffType.RENAME,
oldKey, newKey)));
}
+
+ counter++;
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 3498003a50..4ca0cfa8df 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -1245,11 +1245,13 @@ public class OzoneManagerRequestHandler implements
RequestHandler {
snapshotDiffRequest.getToSnapshot(),
snapshotDiffRequest.getToken(),
snapshotDiffRequest.getPageSize(),
- snapshotDiffRequest.getForceFullDiff());
+ snapshotDiffRequest.getForceFullDiff(),
+ snapshotDiffRequest.getCancel());
SnapshotDiffResponse.Builder builder = SnapshotDiffResponse.newBuilder()
.setJobStatus(response.getJobStatus().toProtobuf())
- .setWaitTimeInMs(response.getWaitTimeInMs());
+ .setWaitTimeInMs(response.getWaitTimeInMs())
+ .setJobCancelResult(response.getJobCancelResult().toProtobuf());
if (response.getSnapshotDiffReport() != null) {
builder.setSnapshotDiffReport(
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index bbca98081e..7b1a8ad88b 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -27,6 +27,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -41,12 +42,17 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.helpers.WithObjectID;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobCancelResult;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.rocksdb.util.ManagedSstFileReader;
import org.apache.ozone.rocksdb.util.RdbUtil;
import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
@@ -57,7 +63,9 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Matchers;
import org.mockito.Mock;
@@ -76,6 +84,7 @@ import org.rocksdb.RocksIterator;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -89,6 +98,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
+import java.util.stream.Stream;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER;
@@ -144,7 +154,7 @@ public class TestSnapshotDiffManager {
private OmSnapshot getMockedOmSnapshot(String snapshot) {
OmSnapshot omSnapshot = Mockito.mock(OmSnapshot.class);
- OMMetadataManager metadataManager = Mockito.mock(OMMetadataManager.class);
+ OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
DBStore dbStore = getMockedDBStore(snapshot);
Mockito.when(omSnapshot.getName()).thenReturn(snapshot);
Mockito.when(omSnapshot.getMetadataManager()).thenReturn(metadataManager);
@@ -152,8 +162,8 @@ public class TestSnapshotDiffManager {
return omSnapshot;
}
- private SnapshotDiffManager getMockedSnapshotDiffManager(int cacheSize) {
-
+ private SnapshotDiffManager getMockedSnapshotDiffManager(int cacheSize)
+ throws IOException {
Mockito.when(snapdiffDB.get()).thenReturn(rocksDB);
Mockito.when(rocksDB.newIterator(snapdiffJobCFH))
.thenReturn(jobTableIterator);
@@ -172,6 +182,9 @@ public class TestSnapshotDiffManager {
OMMetadataManager mockedMetadataManager =
Mockito.mock(OMMetadataManager.class);
RDBStore mockedRDBStore = Mockito.mock(RDBStore.class);
+ Path diffDir = Files.createTempDirectory("snapdiff_dir");
+ Mockito.when(mockedRDBStore.getSnapshotMetadataDir())
+ .thenReturn(diffDir.toString());
Mockito.when(mockedMetadataManager.getStore()).thenReturn(mockedRDBStore);
Mockito.when(ozoneManager.getMetadataManager())
.thenReturn(mockedMetadataManager);
@@ -179,6 +192,10 @@ public class TestSnapshotDiffManager {
new SnapshotDiffManager(snapdiffDB, differ, ozoneManager,
snapshotCache,
snapdiffJobCFH, snapdiffReportCFH, columnFamilyOptions,
codecRegistry));
+ PersistentMap<String, SnapshotDiffJob> snapDiffJobTable =
+ new SnapshotTestUtils.StubbedPersistentMap<>();
+ HddsWhiteboxTestUtils.setInternalState(snapshotDiffManager,
+ "snapDiffJobTable", snapDiffJobTable);
return snapshotDiffManager;
}
@@ -494,16 +511,38 @@ public class TestSnapshotDiffManager {
throw new RuntimeException(e);
}
});
+ String volumeName = "vol";
+ String bucketName = "buck";
+ String fromSnapName = "fs";
+ String toSnapName = "ts";
+ UUID fromSnapId = UUID.randomUUID();
+ UUID toSnapId = UUID.randomUUID();
+
SnapshotDiffManager snapshotDiffManager =
getMockedSnapshotDiffManager(10);
- snapshotDiffManager.generateDiffReport("jobId",
- objectIdsToCheck, oldObjectIdKeyMap, newObjectIdKeyMap);
+
+ setupMocksForRunningASnapDiff(volumeName, bucketName);
+ setUpSnapshots(volumeName, bucketName,
+ fromSnapName, toSnapName, fromSnapId, toSnapId);
+ String jobKey = fromSnapId + DELIMITER + toSnapId;
+
SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId",
- SnapshotDiffResponse.JobStatus.DONE, "vol", "buck", "fs", "ts",
+ JobStatus.IN_PROGRESS, volumeName,
+ bucketName, fromSnapName, toSnapName,
true, diffMap.size());
+
+ snapshotDiffManager.getSnapDiffJobTable().put(jobKey, snapshotDiffJob);
+
+ snapshotDiffManager.generateDiffReport("jobId",
+ objectIdsToCheck, oldObjectIdKeyMap, newObjectIdKeyMap,
+ volumeName, bucketName, fromSnapName, toSnapName);
+
+ snapshotDiffJob.setStatus(JobStatus.DONE);
+ snapshotDiffManager.getSnapDiffJobTable().put(jobKey, snapshotDiffJob);
+
SnapshotDiffReportOzone snapshotDiffReportOzone =
- snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
- "buck", "fs", "ts",
+ snapshotDiffManager.createPageResponse(snapshotDiffJob, volumeName,
+ bucketName, fromSnapName, toSnapName,
0, Integer.MAX_VALUE);
Set<SnapshotDiffReport.DiffType> expectedOrder = new LinkedHashSet<>();
expectedOrder.add(SnapshotDiffReport.DiffType.DELETE);
@@ -613,4 +652,282 @@ public class TestSnapshotDiffManager {
}
}
+ /**
+ * Once a job is cancelled, it stays in the table until
+ * SnapshotDiffCleanupService removes it.
+ *
+ * Job response until that happens, is CANCELLED.
+ */
+ @Test
+ public void testGetSnapshotDiffReportForCancelledJob()
+ throws IOException {
+ SnapshotDiffManager snapshotDiffManager =
+ getMockedSnapshotDiffManager(10);
+
+ String volumeName = "vol-" + RandomStringUtils.randomNumeric(5);
+ String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5);
+
+ String fromSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
+ String toSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
+
+ UUID fromSnapshotUUID = UUID.randomUUID();
+ UUID toSnapshotUUID = UUID.randomUUID();
+
+ setupMocksForRunningASnapDiff(volumeName, bucketName);
+
+ setUpSnapshots(volumeName, bucketName, fromSnapshotName,
+ toSnapshotName, fromSnapshotUUID, toSnapshotUUID);
+
+ PersistentMap<String, SnapshotDiffJob> snapDiffJobTable =
+ snapshotDiffManager.getSnapDiffJobTable();
+ String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID;
+
+ SnapshotDiffJob diffJob = snapDiffJobTable.get(diffJobKey);
+ Assertions.assertNull(diffJob);
+
+ // Submit a new job.
+ SnapshotDiffResponse snapshotDiffResponse = snapshotDiffManager
+ .getSnapshotDiffReport(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName,
+ 0, 0, false);
+
+ Assertions.assertEquals(JobStatus.IN_PROGRESS,
+ snapshotDiffResponse.getJobStatus());
+
+ // Cancel the job.
+ snapshotDiffManager.cancelSnapshotDiff(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName);
+
+ // Job status should be cancelled until the cleanup
+ // service removes the job from the table.
+ snapshotDiffResponse = snapshotDiffManager
+ .getSnapshotDiffReport(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName,
+ 0, 0, false);
+
+ Assertions.assertEquals(JobStatus.CANCELLED,
+ snapshotDiffResponse.getJobStatus());
+
+ // Check snapDiffJobTable.
+ diffJob = snapDiffJobTable.get(diffJobKey);
+ Assertions.assertNotNull(diffJob);
+ Assertions.assertEquals(JobStatus.CANCELLED,
+ diffJob.getStatus());
+
+ // Response should still be cancelled.
+ snapshotDiffResponse = snapshotDiffManager
+ .getSnapshotDiffReport(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName,
+ 0, 0, false);
+
+ Assertions.assertEquals(JobStatus.CANCELLED,
+ snapshotDiffResponse.getJobStatus());
+
+ // Check snapDiffJobTable.
+ diffJob = snapDiffJobTable.get(diffJobKey);
+ Assertions.assertNotNull(diffJob);
+ Assertions.assertEquals(JobStatus.CANCELLED,
+ diffJob.getStatus());
+ }
+
+ private static Stream<Arguments> snapDiffCancelFailureScenarios() {
+ return Stream.of(
+ Arguments.of(JobStatus.IN_PROGRESS,
+ JobCancelResult.CANCELLATION_SUCCESS, true),
+ Arguments.of(JobStatus.CANCELLED,
+ JobCancelResult.JOB_ALREADY_CANCELLED, true),
+ Arguments.of(JobStatus.DONE,
+ JobCancelResult.JOB_DONE, false),
+ Arguments.of(JobStatus.QUEUED,
+ JobCancelResult.INVALID_STATUS_TRANSITION, false),
+ Arguments.of(JobStatus.FAILED,
+ JobCancelResult.INVALID_STATUS_TRANSITION, false),
+ Arguments.of(JobStatus.REJECTED,
+ JobCancelResult.INVALID_STATUS_TRANSITION, false)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("snapDiffCancelFailureScenarios")
+ public void testSnapshotDiffCancelFailure(JobStatus jobStatus,
+ JobCancelResult cancelResult,
+ boolean jobIsCancelled)
+ throws IOException {
+ SnapshotDiffManager snapshotDiffManager =
+ getMockedSnapshotDiffManager(10);
+
+ String volumeName = "vol-" + RandomStringUtils.randomNumeric(5);
+ String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5);
+
+ String fromSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
+ String toSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
+
+ UUID fromSnapshotUUID = UUID.randomUUID();
+ UUID toSnapshotUUID = UUID.randomUUID();
+
+ setupMocksForRunningASnapDiff(volumeName, bucketName);
+
+ setUpSnapshots(volumeName, bucketName, fromSnapshotName,
+ toSnapshotName, fromSnapshotUUID, toSnapshotUUID);
+
+ PersistentMap<String, SnapshotDiffJob> snapDiffJobTable =
+ snapshotDiffManager.getSnapDiffJobTable();
+ String diffJobKey = fromSnapshotUUID + DELIMITER + toSnapshotUUID;
+
+ String jobId = UUID.randomUUID().toString();
+ SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0L,
+ jobId, jobStatus, volumeName, bucketName,
+ fromSnapshotName, toSnapshotName, true, 10);
+
+ snapDiffJobTable.put(diffJobKey, snapshotDiffJob);
+
+ SnapshotDiffResponse snapshotDiffResponse = snapshotDiffManager
+ .cancelSnapshotDiff(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName);
+
+ Assertions.assertEquals(cancelResult,
+ snapshotDiffResponse.getJobCancelResult());
+
+ if (jobIsCancelled) {
+ Assertions.assertEquals(JobStatus.CANCELLED,
+ snapshotDiffResponse.getJobStatus());
+ }
+ }
+
+ @Test
+ public void testCancelNewSnapshotDiff()
+ throws IOException {
+ SnapshotDiffManager snapshotDiffManager =
+ getMockedSnapshotDiffManager(10);
+
+ String volumeName = "vol-" + RandomStringUtils.randomNumeric(5);
+ String bucketName = "bucket-" + RandomStringUtils.randomNumeric(5);
+
+ String fromSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
+ String toSnapshotName = "snap-" + RandomStringUtils.randomNumeric(5);
+
+ UUID fromSnapshotUUID = UUID.randomUUID();
+ UUID toSnapshotUUID = UUID.randomUUID();
+
+ setupMocksForRunningASnapDiff(volumeName, bucketName);
+
+ setUpSnapshots(volumeName, bucketName, fromSnapshotName,
+ toSnapshotName, fromSnapshotUUID, toSnapshotUUID);
+
+ SnapshotDiffResponse snapshotDiffResponse = snapshotDiffManager
+ .cancelSnapshotDiff(volumeName, bucketName,
+ fromSnapshotName, toSnapshotName);
+
+ // The job doesn't exist on the SnapDiffJob table and
+ // trying to cancel it should lead to NEW_JOB cancel result.
+ Assertions.assertEquals(JobCancelResult.NEW_JOB,
+ snapshotDiffResponse.getJobCancelResult());
+ }
+
+ private void setUpSnapshots(String volumeName, String bucketName,
+ String fromSnapshotName, String toSnapshotName,
+ UUID fromSnapshotUUID, UUID toSnapshotUUID)
+ throws IOException {
+ try (MockedStatic<SnapshotUtils> mockedSnapUtils =
+ Mockito.mockStatic(SnapshotUtils.class)) {
+ // Create 1st snapshot.
+ SnapshotInfo fromSnapshotInfo =
+ getSnapshotInfoInstance(volumeName, bucketName,
+ fromSnapshotName, fromSnapshotUUID);
+ mockedSnapUtils.when(() -> SnapshotUtils
+ .getSnapshotInfo(ozoneManager, volumeName,
+ bucketName, fromSnapshotName))
+ .thenReturn(fromSnapshotInfo);
+
+ String fromSnapKey = SnapshotInfo
+ .getTableKey(fromSnapshotInfo.getVolumeName(),
+ fromSnapshotInfo.getBucketName(), fromSnapshotInfo.getName());
+
+ Mockito.when(ozoneManager.getMetadataManager()
+ .getSnapshotInfoTable().get(fromSnapKey))
+ .thenReturn(fromSnapshotInfo);
+
+ mockedSnapUtils.when(() -> SnapshotUtils
+ .getSnapshotInfo(ozoneManager, fromSnapKey))
+ .thenReturn(fromSnapshotInfo);
+
+ OmSnapshot omSnapshotFrom = getMockedOmSnapshot(fromSnapKey);
+ snapshotCache.put(fromSnapKey, omSnapshotFrom);
+
+ // Create 2nd snapshot.
+ SnapshotInfo toSnapshotInfo =
+ getSnapshotInfoInstance(volumeName, bucketName,
+ toSnapshotName, toSnapshotUUID);
+
+ mockedSnapUtils.when(
+ () -> SnapshotUtils.getSnapshotInfo(ozoneManager,
+ volumeName, bucketName, toSnapshotName))
+ .thenReturn(toSnapshotInfo);
+
+ String toSnapKey = SnapshotInfo
+ .getTableKey(toSnapshotInfo.getVolumeName(),
+ toSnapshotInfo.getBucketName(), toSnapshotInfo.getName());
+
+ Mockito.when(ozoneManager.getMetadataManager()
+ .getSnapshotInfoTable().get(toSnapKey)).thenReturn(toSnapshotInfo);
+
+ mockedSnapUtils.when(() -> SnapshotUtils
+ .getSnapshotInfo(ozoneManager, toSnapKey))
+ .thenReturn(toSnapshotInfo);
+
+ OmSnapshot omSnapshotTo = getMockedOmSnapshot(toSnapKey);
+ snapshotCache.put(toSnapKey, omSnapshotTo);
+ }
+ }
+
+ private SnapshotInfo getSnapshotInfoInstance(
+ String volumeName, String bucketName,
+ String snapshotName, UUID snapshotUUID) {
+ SnapshotInfo snapshotInfo = SnapshotInfo
+ .newInstance(volumeName, bucketName,
+ snapshotName, snapshotUUID,
+ System.currentTimeMillis());
+ snapshotInfo.setSnapshotStatus(SnapshotInfo
+ .SnapshotStatus.SNAPSHOT_ACTIVE);
+ return snapshotInfo;
+ }
+
+ private void setupMocksForRunningASnapDiff(
+ String volumeName, String bucketName)
+ throws IOException {
+ Mockito.when(ozoneManager.getMetadataManager().getSnapshotInfoTable())
+ .thenReturn(Mockito.mock(Table.class));
+ Mockito.when(ozoneManager.getMetadataManager().getBucketTable())
+ .thenReturn(Mockito.mock(Table.class));
+
+ Map<BucketLayout, String> keyTableMap = new HashMap<>();
+ keyTableMap.put(BucketLayout.FILE_SYSTEM_OPTIMIZED,
+ OmMetadataManagerImpl.FILE_TABLE);
+ keyTableMap.put(BucketLayout.OBJECT_STORE,
+ OmMetadataManagerImpl.KEY_TABLE);
+ keyTableMap.put(BucketLayout.LEGACY,
+ OmMetadataManagerImpl.KEY_TABLE);
+
+ for (Map.Entry<BucketLayout, String> entry : keyTableMap.entrySet()) {
+ Mockito.when(ozoneManager.getMetadataManager()
+ .getKeyTable(entry.getKey()))
+ .thenReturn(Mockito.mock(Table.class));
+ Mockito.when(ozoneManager.getMetadataManager()
+ .getKeyTable(entry.getKey()).getName())
+ .thenReturn(entry.getValue());
+ }
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+ .setOwner(ugi.getShortUserName())
+ .build();
+
+ String bucketKey = ozoneManager.getMetadataManager()
+ .getBucketKey(volumeName, bucketName);
+ Mockito.when(ozoneManager.getMetadataManager().getBucketTable()
+ .get(bucketKey)).thenReturn(bucketInfo);
+ }
}
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 4905fd1d30..286e016b8e 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -698,7 +698,7 @@ public class BasicOzoneClientAdapterImpl implements
OzoneClientAdapter {
while (true) {
snapshotDiffResponse =
objectStore.snapshotDiff(volume.getName(), bucket.getName(),
- fromSnapshot, toSnapshot, token, -1, false);
+ fromSnapshot, toSnapshot, token, -1, false, false);
if (snapshotDiffResponse.getJobStatus() == DONE) {
break;
}
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 382f1df385..923899404d 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -1352,7 +1352,7 @@ public class BasicRootedOzoneClientAdapterImpl
while (true) {
snapshotDiffResponse =
objectStore.snapshotDiff(volume, bucket, fromSnapshot, toSnapshot,
- token, -1, false);
+ token, -1, false, false);
if (snapshotDiffResponse.getJobStatus() == DONE) {
break;
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index 2067f51832..8e30380b6a 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -639,7 +639,8 @@ public class ClientProtocolStub implements ClientProtocol {
String toSnapshot,
String token,
int pageSize,
- boolean forceFullDiff)
+ boolean forceFullDiff,
+ boolean cancel)
throws IOException {
return null;
}
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/SnapshotDiffHandler.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/SnapshotDiffHandler.java
index dba33474a9..bf63ca4703 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/SnapshotDiffHandler.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/SnapshotDiffHandler.java
@@ -28,7 +28,7 @@ import java.io.IOException;
import java.io.PrintStream;
/**
- * ozone snapshot diff.
+ * ozone sh snapshot diff.
*/
@CommandLine.Command(name = "diff", aliases = "snapshotDiff",
description = "Get the differences between two snapshots")
@@ -60,6 +60,13 @@ public class SnapshotDiffHandler extends Handler {
hidden = true)
private boolean forceFullDiff;
+
+ @CommandLine.Option(names = {"-c", "--cancel"},
+ description = "Request to cancel a running SnapshotDiff job. " +
+ "If the job is not IN_PROGRESS, the request will fail.",
+ defaultValue = "false")
+ private boolean cancel;
+
@Override
protected OzoneAddress getAddress() {
return snapshotPath.getValue();
@@ -77,7 +84,7 @@ public class SnapshotDiffHandler extends Handler {
try (PrintStream stream = out()) {
stream.print(client.getObjectStore()
.snapshotDiff(volumeName, bucketName, fromSnapshot, toSnapshot,
- token, pageSize, forceFullDiff));
+ token, pageSize, forceFullDiff, cancel));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]