This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fe14f369e09 HDDS-14038. Optimize SnapshotDiff by removing intermediate
tables (#9399)
fe14f369e09 is described below
commit fe14f369e09f761dda8cf38ebc9d9ed2fb89a25a
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Mar 9 21:59:51 2026 +0100
HDDS-14038. Optimize SnapshotDiff by removing intermediate tables (#9399)
Co-authored-by: Wei-Chiu Chuang <[email protected]>
---
.../hadoop/ozone/om/helpers/SnapshotDiffJob.java | 36 ++-
.../om/helpers/TestOmSnapshotDiffJobCodec.java | 4 +-
.../hadoop/ozone/om/snapshot/TestOmSnapshot.java | 2 +-
.../src/main/proto/OmClientProtocol.proto | 1 +
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 20 +-
.../om/service/SnapshotDiffCleanupService.java | 8 +-
.../ozone/om/snapshot/SnapshotDiffManager.java | 292 +++++++++------------
.../om/service/TestSnapshotDiffCleanupService.java | 2 +-
.../ozone/om/snapshot/SnapshotTestUtils.java | 46 ++--
.../ozone/om/snapshot/TestSnapshotDiffManager.java | 125 +++++----
10 files changed, 269 insertions(+), 267 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java
index b09ea1a4278..1e5da689f4b 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java
@@ -34,8 +34,7 @@
*/
public class SnapshotDiffJob {
- private static final Codec<SnapshotDiffJob> CODEC =
- new SnapshotDiffJobCodec();
+ private static final Codec<SnapshotDiffJob> CODEC = new
SnapshotDiffJobCodec();
private long creationTime;
private String jobId;
@@ -47,6 +46,7 @@ public class SnapshotDiffJob {
private boolean forceFullDiff;
private boolean disableNativeDiff;
private long totalDiffEntries;
+ private String largestEntryKey;
// Reason tells why the job was FAILED. It should be set only if job status
// is FAILED.
@@ -76,7 +76,8 @@ public SnapshotDiffJob(long creationTime,
boolean disableNativeDiff,
long totalDiffEntries,
SubStatus subStatus,
- double keysProcessedPct) {
+ double keysProcessedPct,
+ String largestEntryKey) {
this.creationTime = creationTime;
this.jobId = jobId;
this.status = jobStatus;
@@ -90,6 +91,7 @@ public SnapshotDiffJob(long creationTime,
this.reason = StringUtils.EMPTY;
this.subStatus = subStatus;
this.keysProcessedPct = keysProcessedPct;
+ this.largestEntryKey = largestEntryKey;
}
public static Codec<SnapshotDiffJob> getCodec() {
@@ -160,12 +162,16 @@ public void setCreationTime(long creationTime) {
this.creationTime = creationTime;
}
+ public void setTotalDiffEntries(long totalDiffEntries) {
+ this.totalDiffEntries = totalDiffEntries;
+ }
+
public long getTotalDiffEntries() {
return totalDiffEntries;
}
- public void setTotalDiffEntries(long totalDiffEntries) {
- this.totalDiffEntries = totalDiffEntries;
+ public void setLargestEntryKey(String largestEntryKey) {
+ this.largestEntryKey = largestEntryKey;
}
public String getReason() {
@@ -200,6 +206,10 @@ public void setKeysProcessedPct(double keysProcessedPct) {
this.keysProcessedPct = keysProcessedPct;
}
+ public String getLargestEntryKey() {
+ return largestEntryKey;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder("creationTime :
").append(creationTime)
@@ -211,7 +221,8 @@ public String toString() {
.append(", toSnapshot: ").append(toSnapshot)
.append(", forceFullDiff: ").append(forceFullDiff)
.append(", disableNativeDiff: ").append(disableNativeDiff)
- .append(", totalDiffEntries: ").append(totalDiffEntries);
+ .append(", totalDiffEntries: ").append(totalDiffEntries)
+ .append(", largestEntryKey: ").append(largestEntryKey);
if (StringUtils.isNotEmpty(reason)) {
sb.append(", reason: ").append(reason);
@@ -246,7 +257,8 @@ public boolean equals(Object other) {
Objects.equals(this.disableNativeDiff, otherJob.disableNativeDiff)
&& Objects.equals(this.reason, otherJob.reason) &&
Objects.equals(this.subStatus, otherJob.subStatus) &&
- Objects.equals(this.keysProcessedPct, otherJob.keysProcessedPct);
+ Objects.equals(this.keysProcessedPct, otherJob.keysProcessedPct) &&
+ Objects.equals(this.largestEntryKey, otherJob.largestEntryKey);
}
return false;
}
@@ -255,7 +267,7 @@ public boolean equals(Object other) {
public int hashCode() {
return Objects.hash(creationTime, jobId, status, volume, bucket,
fromSnapshot, toSnapshot, forceFullDiff, disableNativeDiff,
- totalDiffEntries, reason, subStatus, keysProcessedPct);
+ totalDiffEntries, reason, subStatus, keysProcessedPct,
largestEntryKey);
}
public SnapshotDiffJobProto toProtoBuf() {
@@ -276,6 +288,10 @@ public SnapshotDiffJobProto toProtoBuf() {
if (subStatus != null) {
builder.setSubStatus(subStatus.toProtoBuf());
}
+ if (largestEntryKey != null) {
+ builder.setLargestEntryKey(largestEntryKey);
+ }
+
return builder.build();
}
@@ -285,6 +301,7 @@ public static SnapshotDiffJob getFromProtoBuf(
JobStatus.fromProtobuf(diffJobProto.getStatus()) : null;
SubStatus subStatus = (diffJobProto.hasSubStatus()) ?
SubStatus.fromProtoBuf(diffJobProto.getSubStatus()) : null;
+ String largestEntryKey = diffJobProto.hasLargestEntryKey() ?
diffJobProto.getLargestEntryKey() : null;
return new SnapshotDiffJob(
diffJobProto.getCreationTime(),
diffJobProto.getJobId(),
@@ -297,7 +314,8 @@ public static SnapshotDiffJob getFromProtoBuf(
diffJobProto.getDisableNativeDiff(),
diffJobProto.getTotalDiffEntries(),
subStatus,
- diffJobProto.getKeysProcessedPct());
+ diffJobProto.getKeysProcessedPct(),
+ largestEntryKey);
}
/**
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotDiffJobCodec.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotDiffJobCodec.java
index 6e86a73d472..b24d225d0cc 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotDiffJobCodec.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotDiffJobCodec.java
@@ -47,8 +47,7 @@ public void testOldJsonSerializedDataCanBeReadByNewCodec()
throws Exception {
false,
100L,
SubStatus.SST_FILE_DELTA_DAG_WALK,
- 0.0
- );
+ 0.0, "jobKey");
// Step 2: Serialize using the old Jackson-based codec
byte[] oldFormatData = oldCodec.toPersistedFormatImpl(original);
@@ -67,6 +66,7 @@ public void testOldJsonSerializedDataCanBeReadByNewCodec()
throws Exception {
assertEquals(original.isNativeDiffDisabled(),
parsed.isNativeDiffDisabled());
assertEquals(original.getSubStatus(), parsed.getSubStatus());
assertEquals(original.getTotalDiffEntries(), parsed.getTotalDiffEntries());
+ assertEquals(original.getLargestEntryKey(), parsed.getLargestEntryKey());
assertEquals(0.0, parsed.getKeysProcessedPct());
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
index 5f14451fad3..ccbf97f3c95 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
@@ -1442,7 +1442,7 @@ public void testSnapDiff() throws Exception {
IOException ioException = assertThrows(IOException.class,
() -> store.snapshotDiff(volume, bucket, snap6,
- snap7, "3", 0, forceFullSnapshotDiff, disableNativeDiff));
+ snap7, "400000000000000000003", 0, forceFullSnapshotDiff,
disableNativeDiff));
assertThat(ioException.getMessage()).contains("Index (given: 3) " +
"should be a number >= 0 and < totalDiffEntries: 2. Page size " +
"(given: 1000) should be a positive number > 0.");
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 94399e942c2..b6bed6b0198 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -917,6 +917,7 @@ message SnapshotDiffJobProto {
optional bool disableNativeDiff = 11;
optional SnapshotDiffResponse.SubStatus subStatus = 12;
optional double keysProcessedPct = 13;
+ optional string largestEntryKey = 14;
}
message OzoneObj {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 1608d1a8cef..56603a9f207 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -845,7 +845,7 @@ public SnapshotDiffResponse getSnapshotDiffReport(final
String volume,
return new SnapshotDiffResponse(diffReport, DONE, 0L);
}
- int index = getIndexFromToken(token);
+ String index = validateToken(token);
if (pageSize <= 0 || pageSize > maxPageSize) {
pageSize = maxPageSize;
}
@@ -969,24 +969,18 @@ public int getInFlightSnapshotCount() {
return inFlightSnapshotCount.get();
}
- private int getIndexFromToken(final String token) throws IOException {
+ private String validateToken(final String token) throws IOException {
if (isBlank(token)) {
- return 0;
+ return "";
}
- // Validate that token passed in the request is valid integer as of now.
- // Later we can change it if we migrate to encrypted or cursor token.
- try {
- int index = Integer.parseInt(token);
- if (index < 0) {
- throw new IOException("Passed token is invalid. Resend the request " +
- "with valid token returned in previous request.");
- }
- return index;
- } catch (NumberFormatException exception) {
+ // Validate that the token passed in the request matches the expected
format:
+ // <diffTypePrefix><20-digit-padded-index>. Update this logic if the token
format changes in the future.
+ if (!token.matches("[0-9]+")) {
throw new IOException("Passed token is invalid. " +
"Resend the request with valid token returned in previous request.");
}
+ return token;
}
private ManagedRocksDB createRocksDbForSnapshotDiff(
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
index e73d5ef4253..d4d759a4e4e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
@@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
@@ -195,14 +196,9 @@ private void removeOlderJobReport() {
if (totalNumberOfEntries > 0) {
byte[] beginKey = codecRegistry.asRawData(prefix + DELIMITER + 0);
- byte[] endKey = codecRegistry.asRawData(prefix + DELIMITER +
- (totalNumberOfEntries - 1));
+ byte[] endKey =
codecRegistry.asRawData(StringUtils.getLexicographicallyHigherString(prefix +
DELIMITER));
// Delete Range excludes the endKey.
- // Hence, we do two delete,
- // 1. deleteRange form beginKey(included) to endKey(excluded).
- // 2. delete endKey.
writeBatch.deleteRange(snapDiffReportCfh, beginKey, endKey);
- writeBatch.delete(snapDiffReportCfh, endKey);
}
// Finally, remove the entry from the purged job table.
writeBatch.delete(snapDiffPurgedJobCfh, key);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 2147fc3ec18..9dbf546f18d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.om.snapshot;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.leftPad;
import static
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
import static
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.CREATE;
@@ -49,6 +50,7 @@
import static
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_JOB_NOT_EXIST;
import static
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_NON_CANCELLABLE;
import static
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_SUCCEEDED;
+import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone.getDiffReportEntry;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.CANCELLED;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.FAILED;
@@ -60,6 +62,7 @@
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_OBS;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jakarta.annotation.Nonnull;
@@ -74,6 +77,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -100,11 +104,13 @@
import org.apache.hadoop.hdds.utils.db.ManagedRawSSTFileReader;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.SstFileSetReader;
+import org.apache.hadoop.hdds.utils.db.StringCodec;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.ozone.OFSPath;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -142,11 +148,10 @@
* Class to generate snapshot diff.
*/
public class SnapshotDiffManager implements AutoCloseable {
- private static final Logger LOG =
LoggerFactory.getLogger(SnapshotDiffManager.class);
- private static final String DELETE_DIFF_TABLE_SUFFIX = "-delete-diff";
- private static final String RENAME_DIFF_TABLE_SUFFIX = "-rename-diff";
- private static final String CREATE_DIFF_TABLE_SUFFIX = "-create-diff";
- private static final String MODIFY_DIFF_TABLE_SUFFIX = "-modify-diff";
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDiffManager.class);
+ private static final Map<DiffType, String> DIFF_TYPE_STRING_MAP =
+ new EnumMap<>(ImmutableMap.of(DELETE, "1", RENAME, "2", CREATE, "3",
MODIFY, "4"));
private final ManagedRocksDB db;
private final OzoneManager ozoneManager;
@@ -331,12 +336,49 @@ private void createEmptySnapDiffDir(Path path) {
}
}
+ static String getReportKeyForIndex(String jobId, String index) {
+ return jobId + DELIMITER + index;
+ }
+
+ static String getIndexFromReportKey(String reportKey) {
+ return reportKey.substring(reportKey.lastIndexOf(DELIMITER) + 1);
+ }
+
/**
* Gets the report key for a particular index of snapshot diff job.
+ * The order in which snap-diff should be applied
+ * 1. Delete diffs
+ * 2. Rename diffs
+ * 3. Create diffs
+ * 4. Modified diffs
+ *
+ * Consider the following scenario
+ *
+ * 1. File "A" is created.
+ * 2. File "B" is created.
+ * 3. File "C" is created.
+ * Snapshot "1" is taken.
+ *
+ * Case 1:
+ * 1. File "A" is deleted.
+ * 2. File "B" is renamed to "A".
+ * Snapshot "2" is taken.
+ *
+ * Snapshot diff should be applied in the following order:
+ * 1. Delete "A"
+ * 2. Rename "B" to "A"
+ *
+ * Case 2:
+ * 1. File "B" is renamed to "C".
+ * 2. File "B" is created.
+ * Snapshot "2" is taken.
+ *
+ * Snapshot diff should be applied in the following order:
+ * 1. Rename "B" to "C"
+ * 2. Create "B"
*/
-
- static String getReportKeyForIndex(String jobId, long index) {
- return jobId + DELIMITER + leftPad(String.valueOf(index), 20, '0');
+ static String getReportKeyForIndex(String jobId, DiffType diffType, long
index) {
+ return getReportKeyForIndex(jobId, DIFF_TYPE_STRING_MAP.get(diffType) +
leftPad(String.valueOf(index), 20, '0'));
}
public CancelSnapshotDiffResponse cancelSnapshotDiff(
@@ -442,7 +484,7 @@ public SnapshotDiffResponse getSnapshotDiffReport(
final String bucketName,
final String fromSnapshotName,
final String toSnapshotName,
- final int index,
+ final String index,
final int pageSize,
final boolean forceFullDiff,
final boolean disableNativeDiff
@@ -521,15 +563,33 @@ SnapshotDiffReportOzone createPageResponse(
final String bucketName,
final String fromSnapshotName,
final String toSnapshotName,
- final int index,
+ final String index,
final int pageSize
) throws IOException {
- if (index < 0 || index > snapDiffJob.getTotalDiffEntries()
- || pageSize <= 0) {
+ if (!isBlank(index)) {
+
DIFF_TYPE_STRING_MAP.values().stream().filter(index::startsWith).findFirst()
+ .orElseThrow(() -> new IOException("Token " + index + " has invalid
prefix. Valid prefixes: "
+ +
DIFF_TYPE_STRING_MAP.values().stream().map(String::valueOf).collect(Collectors.joining(","))));
+ }
+
+ int idx;
+ if (isBlank(index)) {
+ idx = 0;
+ } else {
+ try {
+ idx = Integer.parseInt(index.substring(1));
+ } catch (NumberFormatException e) {
+ throw new IOException("Token " + index + " has invalid numeric part: "
+
+ index.substring(1) + ". It should be a valid integer.", e);
+ }
+ }
+ if (idx < 0 ||
+ (snapDiffJob.getTotalDiffEntries() > 0 && idx >=
snapDiffJob.getTotalDiffEntries()) ||
+ pageSize <= 0) {
throw new IOException(String.format(
"Index (given: %d) should be a number >= 0 and < totalDiffEntries: "
+
"%d. Page size (given: %d) should be a positive number > 0.",
- index, snapDiffJob.getTotalDiffEntries(), pageSize));
+ idx, snapDiffJob.getTotalDiffEntries(), pageSize));
}
OFSPath path = getSnapshotRootPath(volumeName, bucketName);
@@ -545,39 +605,37 @@ SnapshotDiffReportOzone createPageResponse(
Pair<List<DiffReportEntry>, String> createPageResponse(
final SnapshotDiffJob snapDiffJob,
- final int index,
+ final String index,
final int pageSize
) throws IOException {
List<DiffReportEntry> diffReportList = new ArrayList<>();
- boolean hasMoreEntries = true;
-
byte[] lowerIndex = codecRegistry.asRawData(getReportKeyForIndex(
snapDiffJob.getJobId(), index));
- byte[] upperIndex = codecRegistry.asRawData(getReportKeyForIndex(
- snapDiffJob.getJobId(), index + pageSize));
- int idx = index;
+ String highestPossiblePrefix =
+
StringUtils.getLexicographicallyHigherString(DIFF_TYPE_STRING_MAP.values().stream()
+ .max(String::compareTo).get());
+ byte[] upperIndex =
codecRegistry.asRawData(getReportKeyForIndex(snapDiffJob.getJobId(),
highestPossiblePrefix));
try (ClosableIterator<Map.Entry<byte[], byte[]>> iterator =
- snapDiffReportTable.iterator(Optional.of(lowerIndex),
- Optional.of(upperIndex))) {
+ snapDiffReportTable.iterator(Optional.of(lowerIndex),
Optional.of(upperIndex))) {
int itemsFetched = 0;
+ String pageLastKey = "";
while (iterator.hasNext() && itemsFetched < pageSize) {
Map.Entry<byte[], byte[]> entry = iterator.next();
+ pageLastKey = StringCodec.get().fromPersistedFormat(entry.getKey());
byte[] bytes = entry.getValue();
- diffReportList.add(codecRegistry.asObject(bytes,
- DiffReportEntry.class));
- idx += 1;
+ diffReportList.add(codecRegistry.asObject(bytes,
DiffReportEntry.class));
itemsFetched += 1;
}
- if (diffReportList.size() < pageSize) {
- hasMoreEntries = false;
- }
+ // Next token
+ String nextTokenString = iterator.hasNext() ?
+
getIndexFromReportKey(StringCodec.get().fromPersistedFormat(iterator.next().getKey()))
: null;
+
+ checkReportsIntegrity(snapDiffJob, pageLastKey, nextTokenString == null);
+ return Pair.of(diffReportList, nextTokenString);
}
- String nextTokenString = hasMoreEntries ? String.valueOf(idx) : null;
- checkReportsIntegrity(snapDiffJob, index, diffReportList.size());
- return Pair.of(diffReportList, nextTokenString);
}
/**
@@ -588,16 +646,12 @@ Pair<List<DiffReportEntry>, String> createPageResponse(
*/
@VisibleForTesting
void checkReportsIntegrity(final SnapshotDiffJob diffJob,
- final int pageStartIdx,
- final int numberOfEntriesInPage)
- throws IOException {
- if ((pageStartIdx >= diffJob.getTotalDiffEntries() &&
- numberOfEntriesInPage != 0) || (pageStartIdx <
- diffJob.getTotalDiffEntries() && numberOfEntriesInPage == 0)) {
- LOG.error("Expected TotalDiffEntries: {} but found " +
- "TotalDiffEntries: {}",
- diffJob.getTotalDiffEntries(),
- pageStartIdx + numberOfEntriesInPage);
+ final String largestPageIndex,
+ boolean lastPage) throws IOException {
+ // For last page check last entry returned if the largest entry key equals
the largest key stored in the job entry.
+ if (lastPage && (diffJob.getLargestEntryKey() == null ||
!diffJob.getLargestEntryKey().equals(largestPageIndex))) {
+ LOG.error("Expected last entry: {} but found " +
+ "Largest Page entry: {}", diffJob.getLargestEntryKey(),
largestPageIndex);
updateJobStatus(diffJob.getJobId(), DONE, FAILED);
throw new IOException("Report integrity check failed. Retry after: " +
ozoneManager.getOmSnapshotManager().getDiffCleanupServiceInterval());
@@ -611,7 +665,7 @@ private synchronized SnapshotDiffResponse submitSnapDiffJob(
final String bucket,
final String fromSnapshot,
final String toSnapshot,
- final int index,
+ final String index,
final int pageSize,
final boolean forceFullDiff,
final boolean disableNativeDiff
@@ -731,7 +785,7 @@ private synchronized SnapshotDiffJob
getSnapDiffReportStatus(
String jobId = UUID.randomUUID().toString();
snapDiffJob = new SnapshotDiffJob(System.currentTimeMillis(), jobId,
QUEUED, volumeName, bucketName, fromSnapshotName, toSnapshotName,
forceFullDiff,
- disableNativeDiff, 0L, null, 0.0);
+ disableNativeDiff, 0L, null, 0.0, null);
snapDiffJobTable.put(jobKey, snapDiffJob);
}
@@ -920,7 +974,7 @@ void generateSnapshotDiffReport(final String jobKey,
},
() -> {
recordActivity(jobKey, DIFF_REPORT_GEN);
- long totalDiffEntries = generateDiffReport(jobId,
+ Pair<Long, String> reportEntries = generateDiffReport(jobId,
fsKeyTable,
tsKeyTable,
fsDirTable,
@@ -933,10 +987,10 @@ void generateSnapshotDiffReport(final String jobKey,
bucketLayout.isFileSystemOptimized(), oldParentIdPathMap,
newParentIdPathMap, tablePrefixes);
// If job is cancelled, totalDiffEntries will be equal to -1.
- if (totalDiffEntries >= 0 &&
+ if (reportEntries.getKey() >= 0 &&
areDiffJobAndSnapshotsActive(volumeName, bucketName,
fromSnapshotName, toSnapshotName)) {
- updateJobStatusToDone(jobKey, totalDiffEntries);
+ updateJobStatusToDone(jobKey, reportEntries.getKey(),
reportEntries.getValue());
}
return null;
}
@@ -1125,7 +1179,7 @@ private String resolveBucketRelativePath(boolean
isFSOBucket,
}
@SuppressWarnings({"checkstyle:ParameterNumber", "checkstyle:MethodLength"})
- long generateDiffReport(
+ Pair<Long, String> generateDiffReport(
final String jobId,
final Table<String, OmKeyInfo> fsTable,
final Table<String, OmKeyInfo> tsTable,
@@ -1143,43 +1197,21 @@ long generateDiffReport(
final Optional<Map<Long, Path>> newParentIdPathMap,
final TablePrefixInfo tablePrefix) {
LOG.info("Starting diff report generation for jobId: {}.", jobId);
- ColumnFamilyHandle deleteDiffColumnFamily = null;
- ColumnFamilyHandle renameDiffColumnFamily = null;
- ColumnFamilyHandle createDiffColumnFamily = null;
- ColumnFamilyHandle modifyDiffColumnFamily = null;
// JobId is prepended to column family name to make it unique for request.
try {
- deleteDiffColumnFamily =
- createColumnFamily(jobId + DELETE_DIFF_TABLE_SUFFIX);
- renameDiffColumnFamily =
- createColumnFamily(jobId + RENAME_DIFF_TABLE_SUFFIX);
- createDiffColumnFamily =
- createColumnFamily(jobId + CREATE_DIFF_TABLE_SUFFIX);
- modifyDiffColumnFamily =
- createColumnFamily(jobId + MODIFY_DIFF_TABLE_SUFFIX);
-
- // Keep byte array instead of storing as DiffReportEntry to avoid
- // unnecessary serialization and deserialization.
- final PersistentList<byte[]> deleteDiffs =
- createDiffReportPersistentList(deleteDiffColumnFamily);
- final PersistentList<byte[]> renameDiffs =
- createDiffReportPersistentList(renameDiffColumnFamily);
- final PersistentList<byte[]> createDiffs =
- createDiffReportPersistentList(createDiffColumnFamily);
- final PersistentList<byte[]> modifyDiffs =
- createDiffReportPersistentList(modifyDiffColumnFamily);
-
try (ClosableIterator<Map.Entry<byte[], Boolean>>
iterator = objectIdToIsDirMap.iterator()) {
// This counter is used, so that we can check every 100 elements
// if the job is cancelled and snapshots are still active.
int counter = 0;
+ long index = 0;
+ String largestJobKey = "";
while (iterator.hasNext()) {
if (counter % 100 == 0 &&
!areDiffJobAndSnapshotsActive(volumeName, bucketName,
fromSnapshotName, toSnapshotName)) {
- return -1L;
+ return Pair.of(-1L, null);
}
Map.Entry<byte[], Boolean> nextEntry = iterator.next();
@@ -1201,7 +1233,7 @@ long generateDiffReport(
*/
byte[] oldKeyName = oldObjIdToKeyMap.get(id);
byte[] newKeyName = newObjIdToKeyMap.get(id);
-
+ String jobKey = "";
if (oldKeyName == null && newKeyName == null) {
// This cannot happen.
throw new IllegalStateException(
@@ -1210,26 +1242,23 @@ long generateDiffReport(
String key = resolveBucketRelativePath(isFSOBucket,
newParentIdPathMap, newKeyName, true);
if (key != null) {
- DiffReportEntry entry =
- SnapshotDiffReportOzone.getDiffReportEntry(CREATE, key);
- createDiffs.add(codecRegistry.asRawData(entry));
+ DiffReportEntry entry = getDiffReportEntry(CREATE, key);
+ jobKey = addToReport(jobId, index++, entry);
}
} else if (newKeyName == null) { // Key Deleted.
String key = resolveBucketRelativePath(isFSOBucket,
oldParentIdPathMap, oldKeyName, true);
if (key != null) {
- DiffReportEntry entry =
- SnapshotDiffReportOzone.getDiffReportEntry(DELETE, key);
- deleteDiffs.add(codecRegistry.asRawData(entry));
+ DiffReportEntry entry = getDiffReportEntry(DELETE, key);
+ jobKey = addToReport(jobId, index++, entry);
}
} else if (isDirectoryObject &&
Arrays.equals(oldKeyName, newKeyName)) {
String key = resolveBucketRelativePath(isFSOBucket,
newParentIdPathMap, newKeyName, true);
if (key != null) {
- DiffReportEntry entry =
- SnapshotDiffReportOzone.getDiffReportEntry(MODIFY, key);
- modifyDiffs.add(codecRegistry.asRawData(entry));
+ DiffReportEntry entry = getDiffReportEntry(MODIFY, key);
+ jobKey = addToReport(jobId, index++, entry);
}
} else {
String keyPrefix = tablePrefix.getTablePrefix((isDirectoryObject ?
fsDirTable : fsTable).getName());
@@ -1246,8 +1275,7 @@ long generateDiffReport(
"oldKey is null for oldKey : %s newKey: %s",
codecRegistry.asObject(oldKeyName, String.class),
codecRegistry.asObject(newKeyName, String.class)));
} else if (newKey == null) {
- deleteDiffs.add(codecRegistry.asRawData(SnapshotDiffReportOzone
- .getDiffReportEntry(DELETE, oldKey)));
+ jobKey = addToReport(jobId, index++, getDiffReportEntry(DELETE,
oldKey));
} else {
// Check if block location is same or not. If it is not same,
// key must have been overridden as well.
@@ -1256,72 +1284,29 @@ long generateDiffReport(
keyPrefix + codecRegistry.asObject(newKeyName, String.class),
isDirectoryObject ? fsDirTable : fsTable,
isDirectoryObject ? tsDirTable : tsTable);
+ if (!isObjectModified || !Arrays.equals(oldKeyName, newKeyName))
{
+ jobKey = addToReport(jobId, index++,
getDiffReportEntry(RENAME, oldKey, newKey));
+ }
if (isObjectModified) {
// Here, oldKey name is returned as modified. Modified key name
// is based on base snapshot (from snapshot).
- modifyDiffs.add(codecRegistry.asRawData(SnapshotDiffReportOzone
- .getDiffReportEntry(MODIFY, oldKey)));
- }
- if (!isObjectModified || !Arrays.equals(oldKeyName, newKeyName))
{
- renameDiffs.add(codecRegistry.asRawData(
- SnapshotDiffReportOzone.getDiffReportEntry(RENAME, oldKey,
- newKey)));
+ String modifiedJobKey = addToReport(jobId, index++,
getDiffReportEntry(MODIFY, oldKey));
+ if (modifiedJobKey.compareTo(jobKey) > 0) {
+ jobKey = modifiedJobKey;
+ }
}
}
}
+ if (jobKey.compareTo(largestJobKey) > 0) {
+ largestJobKey = jobKey;
+ }
counter++;
}
+ return Pair.of(index, largestJobKey);
}
-
- /*
- * The order in which snap-diff should be applied
- *
- * 1. Delete diffs
- * 2. Rename diffs
- * 3. Create diffs
- * 4. Modified diffs
- *
- * Consider the following scenario
- *
- * 1. File "A" is created.
- * 2. File "B" is created.
- * 3. File "C" is created.
- * Snapshot "1" is taken.
- *
- * Case 1:
- * 1. File "A" is deleted.
- * 2. File "B" is renamed to "A".
- * Snapshot "2" is taken.
- *
- * Snapshot diff should be applied in the following order:
- * 1. Delete "A"
- * 2. Rename "B" to "A"
- *
- *
- * Case 2:
- * 1. File "B" is renamed to "C".
- * 2. File "B" is created.
- * Snapshot "2" is taken.
- *
- * Snapshot diff should be applied in the following order:
- * 1. Rename "B" to "C"
- * 2. Create "B"
- *
- */
-
- long index = 0;
- index = addToReport(jobId, index, deleteDiffs);
- index = addToReport(jobId, index, renameDiffs);
- index = addToReport(jobId, index, createDiffs);
- return addToReport(jobId, index, modifyDiffs);
- } catch (RocksDBException | IOException e) {
+ } catch (IOException e) {
// TODO: [SNAPSHOT] Fail gracefully.
throw new RuntimeException(e);
- } finally {
- dropAndCloseColumnFamilyHandle(deleteDiffColumnFamily);
- dropAndCloseColumnFamilyHandle(renameDiffColumnFamily);
- dropAndCloseColumnFamilyHandle(createDiffColumnFamily);
- dropAndCloseColumnFamilyHandle(modifyDiffColumnFamily);
}
}
@@ -1365,15 +1350,6 @@ private boolean areAclsSame(OmDirectoryInfo fromObject,
return fromObject.getAcls().equals(toObject.getAcls());
}
- private PersistentList<byte[]> createDiffReportPersistentList(
- ColumnFamilyHandle columnFamilyHandle
- ) {
- return new RocksDbPersistentList<>(db,
- columnFamilyHandle,
- codecRegistry,
- byte[].class);
- }
-
private ColumnFamilyHandle createColumnFamily(String columnFamilyName)
throws RocksDBException {
return db.get().createColumnFamily(
@@ -1382,18 +1358,10 @@ private ColumnFamilyHandle createColumnFamily(String
columnFamilyName)
familyOptions));
}
- private long addToReport(String jobId, long index,
- PersistentList<byte[]> diffReportEntries)
- throws IOException {
- try (ClosableIterator<byte[]>
- diffReportIterator = diffReportEntries.iterator()) {
- while (diffReportIterator.hasNext()) {
- snapDiffReportTable.put(codecRegistry.asRawData(
- getReportKeyForIndex(jobId, index)), diffReportIterator.next());
- index++;
- }
- }
- return index;
+ private String addToReport(String jobId, long index, DiffReportEntry
diffReportEntry) throws IOException {
+ String jobReportKey = getReportKeyForIndex(jobId,
diffReportEntry.getType(), index);
+ snapDiffReportTable.put(codecRegistry.asRawData(jobReportKey),
codecRegistry.asRawData(diffReportEntry));
+ return jobReportKey;
}
private void dropAndCloseColumnFamilyHandle(
@@ -1462,7 +1430,8 @@ private synchronized void updateJobStatusToFailed(String
jobKey,
}
private synchronized void updateJobStatusToDone(String jobKey,
- long totalNumberOfEntries) {
+ long totalDiffEntries,
+ String largestJobKey) {
SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey);
if (snapshotDiffJob.getStatus() != IN_PROGRESS) {
throw new IllegalStateException("Invalid job status for jobID: " +
@@ -1472,7 +1441,8 @@ private synchronized void updateJobStatusToDone(String
jobKey,
}
snapshotDiffJob.setStatus(DONE);
- snapshotDiffJob.setTotalDiffEntries(totalNumberOfEntries);
+ snapshotDiffJob.setTotalDiffEntries(totalDiffEntries);
+ snapshotDiffJob.setLargestEntryKey(largestJobKey);
snapDiffJobTable.put(jobKey, snapshotDiffJob);
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
index 98e7ac62cf9..2dc9329601a 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
@@ -298,7 +298,7 @@ private SnapshotDiffJob addJobAndReport(JobStatus jobStatus,
SnapshotDiffJob job = new SnapshotDiffJob(creationTime, jobId, jobStatus,
volume, bucket, fromSnapshot, toSnapshot, false, false, noOfEntries,
- null, 0.0);
+ null, 0.0, jobId + "-" + noOfEntries);
db.get().put(jobTableCfh, codecRegistry.asRawData(jobKey),
codecRegistry.asRawData(job));
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java
index 1b01c23afd4..9acf8e6b4f7 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java
@@ -17,8 +17,10 @@
package org.apache.hadoop.ozone.om.snapshot;
+import com.google.common.primitives.UnsignedBytes;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
@@ -26,6 +28,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.ozone.util.ClosableIterator;
/**
@@ -33,27 +36,18 @@
*/
public class SnapshotTestUtils {
- private static <K> String getStringKey(K key) {
- if (key.getClass().isArray()) {
- Class<?> componentType = key.getClass().getComponentType();
- if (componentType == byte.class) {
- return Arrays.toString((byte[])key);
- } else if (componentType == int.class) {
- return Arrays.toString((int[])key);
- } else if (componentType == long.class) {
- return Arrays.toString((long[])key);
- } else if (componentType == float.class) {
- return Arrays.toString((float[])key);
- } else if (componentType == double.class) {
- return Arrays.toString((double[])key);
- } else if (componentType == char.class) {
- return Arrays.toString((char[])key);
- } else {
- return Arrays.toString((Object[])key);
- }
+ private static final CodecRegistry CODEC_REGISTRY =
CodecRegistry.newBuilder().build();
+
+ private static final Comparator<byte[]> UNSIGNED_BYTE_COMPARATOR =
UnsignedBytes.lexicographicalComparator();
+ private static final Comparator<Object> COMPARATOR = (k1, k2) -> {
+ try {
+ byte[] k1Bytes = CODEC_REGISTRY.asRawData(k1);
+ byte[] k2Bytes = CODEC_REGISTRY.asRawData(k2);
+ return UNSIGNED_BYTE_COMPARATOR.compare(k1Bytes, k2Bytes);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
- return key.toString();
- }
+ };
/**
* Stubbed implementation of CloseableIterator containing iterators.
@@ -92,13 +86,12 @@ public static class StubbedPersistentMap<K, V> implements
public StubbedPersistentMap(Map<K, V> map) {
this();
- map.entrySet().iterator().forEachRemaining(i ->
- this.put(i.getKey(), i.getValue()));
+ this.map.putAll(map);
}
public StubbedPersistentMap() {
- this.map = new TreeMap<>(
- Comparator.comparing(SnapshotTestUtils::getStringKey));
+
+ this.map = new TreeMap<>(COMPARATOR);
}
@Override
@@ -142,8 +135,7 @@ public StubbedPersistentSet(Set<K> map) {
}
public StubbedPersistentSet() {
- this.set = new TreeSet<>(
- Comparator.comparing(SnapshotTestUtils::getStringKey));
+ this.set = new TreeSet<>(COMPARATOR);
}
@Override
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index 5a82c3b1591..6327b712dee 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.om.snapshot;
+import static org.apache.commons.lang3.StringUtils.leftPad;
import static
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB_DEFAULT;
@@ -95,12 +96,14 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -279,7 +282,7 @@ public void init() throws RocksDBException, IOException,
ExecutionException {
SnapshotDiffJob diffJob = new SnapshotDiffJob(System.currentTimeMillis(),
UUID.randomUUID().toString(), jobStatus, VOLUME_NAME, BUCKET_NAME,
baseSnapshotName, targetSnapshotName, false, false, 0,
- null, 0.0);
+ null, 0.0, "");
snapshotNames.add(targetSnapshotName);
snapshotInfoList.add(targetSnapshot);
@@ -538,6 +541,8 @@ public void testGenerateDiffReport() throws IOException {
PersistentMap<byte[], Boolean> objectIdToIsDirMap =
new SnapshotTestUtils.StubbedPersistentMap<>();
Map<Long, SnapshotDiffReport.DiffType> diffMap = new HashMap<>();
+ AtomicInteger count = new AtomicInteger(0);
+ AtomicInteger maxIndex = new AtomicInteger(0);
LongStream.range(0, 100).forEach(objectId -> {
try {
String key = "key" + objectId;
@@ -557,20 +562,26 @@ public void testGenerateDiffReport() throws IOException {
byte[] renamedKeyBytes = codecRegistry.asRawData(renamedKey);
newObjectIdKeyMap.put(objectIdVal, renamedKeyBytes);
diffMap.put(objectId, SnapshotDiffReport.DiffType.RENAME);
+ count.getAndIncrement();
}
objectIdToIsDirMap.put(objectIdVal, false);
if (objectId >= 50 && objectId <= 100 ||
objectId >= 0 && objectId <= 25 && objectId % 4 > 2) {
diffMap.put(objectId, SnapshotDiffReport.DiffType.DELETE);
+ count.getAndIncrement();
}
if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0) {
diffMap.put(objectId, SnapshotDiffReport.DiffType.RENAME);
- }
- if (objectId >= 0 && objectId <= 25 && objectId % 4 == 2) {
- diffMap.put(objectId, SnapshotDiffReport.DiffType.MODIFY);
+ count.getAndIncrement();
}
if (objectId > 25 && objectId < 50) {
diffMap.put(objectId, SnapshotDiffReport.DiffType.CREATE);
+ count.getAndIncrement();
+ }
+ if (objectId >= 0 && objectId <= 25 && objectId % 4 == 2) {
+ diffMap.put(objectId, SnapshotDiffReport.DiffType.MODIFY);
+ maxIndex.set(count.get());
+ count.getAndIncrement();
}
} catch (IOException e) {
throw new RuntimeException(e);
@@ -622,20 +633,20 @@ public void testGenerateDiffReport() throws IOException {
.areDiffJobAndSnapshotsActive(volumeName, bucketName, fromSnapName,
toSnapName);
- long totalDiffEntries = spy.generateDiffReport("jobId",
+ Pair<Long, String> totalDiffEntries = spy.generateDiffReport("jobId",
fromSnapTable, toSnapTable, null, null,
objectIdToIsDirMap, oldObjectIdKeyMap, newObjectIdKeyMap,
volumeName, bucketName, fromSnapName, toSnapName, false,
Optional.empty(), Optional.empty(), tablePrefixes);
- assertEquals(100, totalDiffEntries);
+ assertEquals(100, totalDiffEntries.getKey());
SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId",
JobStatus.DONE, "vol", "buck", "fs", "ts", false,
- true, diffMap.size(), null, 0.0);
+ true, diffMap.size(), null, 0.0, "jobId-4" +
leftPad(String.valueOf(maxIndex.get()), 20, '0'));
SnapshotDiffReportOzone snapshotDiffReportOzone =
snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
"buck", "fs", "ts",
- 0, Integer.MAX_VALUE);
+ "", Integer.MAX_VALUE);
Set<SnapshotDiffReport.DiffType> expectedOrder = new LinkedHashSet<>();
expectedOrder.add(SnapshotDiffReport.DiffType.DELETE);
expectedOrder.add(SnapshotDiffReport.DiffType.RENAME);
@@ -667,36 +678,52 @@ private DiffReportEntry getTestDiffEntry(String jobId,
* objectId Map of diff keys to be checked with their corresponding key
names.
*/
@ParameterizedTest
- @CsvSource({"0,10,1000", "1,10,8", "10,1000,10", "-1,1000,10000",
- "1,0,1000", "1,-1,1000"})
+ @CsvSource({"0,10", "1,8", "2,8", "3,8"})
public void testCreatePageResponse(int startIdx,
- int pageSize,
int totalNumberOfRecords)
throws IOException, RocksDBException {
String testJobId = "jobId";
String testJobId2 = "jobId2";
+ Map<Integer, String> pageMap = new HashMap<>();
+ Map<Integer, SnapshotDiffReport.DiffType> typeMap = new HashMap<>();
+ List<Integer> indexList = IntStream.range(0,
totalNumberOfRecords).boxed().collect(Collectors.toList());
- IntStream.range(0, totalNumberOfRecords).boxed().forEach(idx -> {
+ for (int idx = 0; idx < totalNumberOfRecords; idx++) {
try {
+ DiffReportEntry reportEntry = getTestDiffEntry(testJobId, idx);
+ String reportEntryKey =
SnapshotDiffManager.getReportKeyForIndex(testJobId, reportEntry.getType(), idx);
+ typeMap.put(idx, reportEntry.getType());
+ pageMap.put(idx, reportEntryKey.split(DELIMITER)[1]);
+ String report2EntryKey = testJobId2 + DELIMITER + idx;
db.get().put(snapDiffReportTable,
- codecRegistry.asRawData(SnapshotDiffManager
- .getReportKeyForIndex(testJobId, idx)),
- codecRegistry.asRawData(getTestDiffEntry(testJobId, idx)));
+ codecRegistry.asRawData(reportEntryKey),
+ codecRegistry.asRawData(reportEntry));
db.get().put(snapDiffReportTable,
- codecRegistry.asRawData(testJobId2 + DELIMITER + idx),
+ codecRegistry.asRawData(report2EntryKey),
codecRegistry.asRawData(getTestDiffEntry(testJobId2, idx)));
} catch (IOException | RocksDBException e) {
throw new RuntimeException(e);
}
+ }
+ List<SnapshotDiffReport.DiffType> orderedList =
Arrays.asList(SnapshotDiffReport.DiffType.DELETE,
+ SnapshotDiffReport.DiffType.RENAME,
SnapshotDiffReport.DiffType.CREATE, SnapshotDiffReport.DiffType.MODIFY);
+
+ indexList.sort((idx1, idx2) -> {
+ if (typeMap.get(idx1) != typeMap.get(idx2)) {
+ return orderedList.indexOf(typeMap.get(idx1)) -
orderedList.indexOf(typeMap.get(idx2));
+ }
+ return idx1 - idx2;
});
+
SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, testJobId,
JobStatus.DONE, "vol", "buck", "fs", "ts", false,
- true, totalNumberOfRecords, null, 0.0);
+ true, totalNumberOfRecords, null, 0.0,
+ testJobId + DELIMITER + pageMap.get(indexList.get(indexList.size() -
1)));
SnapshotDiffJob snapshotDiffJob2 = new SnapshotDiffJob(0, testJobId2,
JobStatus.DONE, "vol", "buck", "fs", "ts", false,
- true, totalNumberOfRecords, null, 0.0);
+ true, totalNumberOfRecords, null, 0.0, "");
db.get().put(snapDiffJobTable,
codecRegistry.asRawData(testJobId),
@@ -705,26 +732,30 @@ public void testCreatePageResponse(int startIdx,
db.get().put(snapDiffJobTable,
codecRegistry.asRawData(testJobId2),
codecRegistry.asRawData(snapshotDiffJob2));
+ for (int pageSize = -1; pageSize <= totalNumberOfRecords; pageSize++) {
+ if (pageSize <= 0) {
+ int finalPageSize = pageSize;
+ assertThrows(IOException.class,
+ () -> snapshotDiffManager.createPageResponse(snapshotDiffJob,
"vol",
+ "buck", "fs", "ts", pageMap.get(startIdx), finalPageSize));
+ continue;
+ }
+ SnapshotDiffReportOzone snapshotDiffReportOzone =
+ snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
+ "buck", "fs", "ts",
+ pageMap.get(startIdx), pageSize);
+ int expectedTotalNumberOfRecords =
+ Math.max(Math.min(pageSize, totalNumberOfRecords -
indexList.indexOf(startIdx)), 0);
+ assertEquals(snapshotDiffReportOzone.getDiffList().size(),
expectedTotalNumberOfRecords);
+ int idx = indexList.indexOf(startIdx);
+ for (DiffReportEntry entry : snapshotDiffReportOzone.getDiffList()) {
+ assertEquals(getTestDiffEntry(testJobId, indexList.get(idx)), entry);
+ idx++;
+ }
- if (pageSize <= 0 || startIdx < 0) {
- assertThrows(IOException.class,
- () -> snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
- "buck", "fs", "ts", startIdx, pageSize));
- return;
- }
- SnapshotDiffReportOzone snapshotDiffReportOzone =
- snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
- "buck", "fs", "ts",
- startIdx, pageSize);
- int expectedTotalNumberOfRecords =
- Math.max(Math.min(pageSize, totalNumberOfRecords - startIdx), 0);
- assertEquals(snapshotDiffReportOzone.getDiffList().size(),
- expectedTotalNumberOfRecords);
-
- int idx = startIdx;
- for (DiffReportEntry entry : snapshotDiffReportOzone.getDiffList()) {
- assertEquals(getTestDiffEntry(testJobId, idx), entry);
- idx++;
+ assertEquals(snapshotDiffReportOzone.getToken() == null ? null :
+
Integer.parseInt(snapshotDiffReportOzone.getToken().substring(1)),
+ idx == indexList.size() ? null : indexList.get(idx));
}
}
@@ -766,7 +797,7 @@ public void testGetSnapshotDiffReportForCancelledJob()
throws IOException {
// Submit a new job.
SnapshotDiffResponse snapshotDiffResponse =
spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName,
- toSnapshotName, 0, 0, false, false);
+ toSnapshotName, "10", 0, false, false);
assertEquals(JobStatus.IN_PROGRESS,
snapshotDiffResponse.getJobStatus());
@@ -778,7 +809,7 @@ public void testGetSnapshotDiffReportForCancelledJob()
throws IOException {
// Job status should be cancelled until the cleanup
// service removes the job from the table.
snapshotDiffResponse = spy.getSnapshotDiffReport(volumeName, bucketName,
- fromSnapshotName, toSnapshotName, 0, 0, false, false);
+ fromSnapshotName, toSnapshotName, "10", 0, false, false);
assertEquals(JobStatus.CANCELLED,
snapshotDiffResponse.getJobStatus());
@@ -791,7 +822,7 @@ public void testGetSnapshotDiffReportForCancelledJob()
throws IOException {
// Response should still be cancelled.
snapshotDiffResponse = spy.getSnapshotDiffReport(volumeName, bucketName,
- fromSnapshotName, toSnapshotName, 0, 0, false, false);
+ fromSnapshotName, toSnapshotName, "100000000000000000000", 0, false,
false);
assertEquals(JobStatus.CANCELLED,
snapshotDiffResponse.getJobStatus());
@@ -841,7 +872,7 @@ public void testSnapshotDiffCancelFailure(JobStatus
jobStatus,
String jobId = UUID.randomUUID().toString();
SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0L,
jobId, jobStatus, volumeName, bucketName, fromSnapshotName,
toSnapshotName,
- true, false, 10, null, 0.0);
+ true, false, 10, null, 0.0, null);
snapDiffJobMap.put(diffJobKey, snapshotDiffJob);
@@ -932,7 +963,7 @@ public void testListSnapshotDiffJobs(String jobStatus,
// SnapshotDiffReport
SnapshotDiffResponse snapshotDiffResponse =
spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName,
- toSnapshotName, 0, 0, false, false);
+ toSnapshotName, "100000000000000000000", 0, false, false);
assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS,
snapshotDiffResponse.getJobStatus());
@@ -979,7 +1010,7 @@ public void testListSnapDiffWithInvalidStatus() throws
IOException {
eq(toSnapshotName), eq(false), eq(false));
spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName,
- toSnapshotName, 0, 0, false, false);
+ toSnapshotName, "10", 0, false, false);
// Invalid status, without listAllStatus true, results in an exception.
assertThrows(IOException.class, () -> snapshotDiffManager
@@ -994,7 +1025,7 @@ public void testGenerateDiffReportWhenThereInEntry() {
new StubbedPersistentMap<>();
PersistentMap<byte[], byte[]> newObjIdToKeyMap =
new StubbedPersistentMap<>();
- long totalDiffEntries = snapshotDiffManager.generateDiffReport("jobId",
+ Pair<Long, String> totalDiffEntries =
snapshotDiffManager.generateDiffReport("jobId",
keyInfoTable,
keyInfoTable,
null,
@@ -1011,7 +1042,7 @@ public void testGenerateDiffReportWhenThereInEntry() {
Optional.empty(),
new TablePrefixInfo(Collections.emptyMap()));
- assertEquals(0, totalDiffEntries);
+ assertEquals(0, totalDiffEntries.getKey());
}
@Test
@@ -1231,7 +1262,7 @@ private SnapshotDiffResponse
submitJob(SnapshotDiffManager diffManager,
String toSnapshotName) {
try {
return diffManager.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME,
- fromSnapshotName, toSnapshotName, 0, 1000, false, false);
+ fromSnapshotName, toSnapshotName, "100000000000000000000", 1000,
false, false);
} catch (IOException exception) {
throw new RuntimeException(exception);
}
@@ -1324,7 +1355,7 @@ public void testGetSnapshotDiffReportJob() throws
Exception {
for (int i = 0; i < snapshotInfoList.size(); i++) {
SnapshotDiffResponse snapshotDiffReport =
spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME,
- snapshotInfo.getName(), snapshotInfoList.get(i).getName(), 0,
+ snapshotInfo.getName(), snapshotInfoList.get(i).getName(),
"100000000000000000000",
1000, false, false);
SnapshotDiffJob diffJob = snapDiffJobs.get(i);
if (diffJob.getStatus() == QUEUED) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]