This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fe14f369e09 HDDS-14038. Optimize SnapshotDiff by removing intermediate 
tables (#9399)
fe14f369e09 is described below

commit fe14f369e09f761dda8cf38ebc9d9ed2fb89a25a
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Mar 9 21:59:51 2026 +0100

    HDDS-14038. Optimize SnapshotDiff by removing intermediate tables (#9399)
    
    Co-authored-by: Wei-Chiu Chuang <[email protected]>
---
 .../hadoop/ozone/om/helpers/SnapshotDiffJob.java   |  36 ++-
 .../om/helpers/TestOmSnapshotDiffJobCodec.java     |   4 +-
 .../hadoop/ozone/om/snapshot/TestOmSnapshot.java   |   2 +-
 .../src/main/proto/OmClientProtocol.proto          |   1 +
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |  20 +-
 .../om/service/SnapshotDiffCleanupService.java     |   8 +-
 .../ozone/om/snapshot/SnapshotDiffManager.java     | 292 +++++++++------------
 .../om/service/TestSnapshotDiffCleanupService.java |   2 +-
 .../ozone/om/snapshot/SnapshotTestUtils.java       |  46 ++--
 .../ozone/om/snapshot/TestSnapshotDiffManager.java | 125 +++++----
 10 files changed, 269 insertions(+), 267 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java
index b09ea1a4278..1e5da689f4b 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java
@@ -34,8 +34,7 @@
  */
 public class SnapshotDiffJob {
 
-  private static final Codec<SnapshotDiffJob> CODEC =
-      new SnapshotDiffJobCodec();
+  private static final Codec<SnapshotDiffJob> CODEC = new 
SnapshotDiffJobCodec();
 
   private long creationTime;
   private String jobId;
@@ -47,6 +46,7 @@ public class SnapshotDiffJob {
   private boolean forceFullDiff;
   private boolean disableNativeDiff;
   private long totalDiffEntries;
+  private String largestEntryKey;
 
   // Reason tells why the job was FAILED. It should be set only if job status
   // is FAILED.
@@ -76,7 +76,8 @@ public SnapshotDiffJob(long creationTime,
                          boolean disableNativeDiff,
                          long totalDiffEntries,
                          SubStatus subStatus,
-                         double keysProcessedPct) {
+                         double keysProcessedPct,
+                         String largestEntryKey) {
     this.creationTime = creationTime;
     this.jobId = jobId;
     this.status = jobStatus;
@@ -90,6 +91,7 @@ public SnapshotDiffJob(long creationTime,
     this.reason = StringUtils.EMPTY;
     this.subStatus = subStatus;
     this.keysProcessedPct = keysProcessedPct;
+    this.largestEntryKey = largestEntryKey;
   }
 
   public static Codec<SnapshotDiffJob> getCodec() {
@@ -160,12 +162,16 @@ public void setCreationTime(long creationTime) {
     this.creationTime = creationTime;
   }
 
+  public void setTotalDiffEntries(long totalDiffEntries) {
+    this.totalDiffEntries = totalDiffEntries;
+  }
+
   public long getTotalDiffEntries() {
     return totalDiffEntries;
   }
 
-  public void setTotalDiffEntries(long totalDiffEntries) {
-    this.totalDiffEntries = totalDiffEntries;
+  public void setLargestEntryKey(String largestEntryKey) {
+    this.largestEntryKey = largestEntryKey;
   }
 
   public String getReason() {
@@ -200,6 +206,10 @@ public void setKeysProcessedPct(double keysProcessedPct) {
     this.keysProcessedPct = keysProcessedPct;
   }
 
+  public String getLargestEntryKey() {
+    return largestEntryKey;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder("creationTime : 
").append(creationTime)
@@ -211,7 +221,8 @@ public String toString() {
         .append(", toSnapshot: ").append(toSnapshot)
         .append(", forceFullDiff: ").append(forceFullDiff)
         .append(", disableNativeDiff: ").append(disableNativeDiff)
-        .append(", totalDiffEntries: ").append(totalDiffEntries);
+        .append(", totalDiffEntries: ").append(totalDiffEntries)
+        .append(", largestEntryKey: ").append(largestEntryKey);
 
     if (StringUtils.isNotEmpty(reason)) {
       sb.append(", reason: ").append(reason);
@@ -246,7 +257,8 @@ public boolean equals(Object other) {
           Objects.equals(this.disableNativeDiff, otherJob.disableNativeDiff)
           && Objects.equals(this.reason, otherJob.reason) &&
           Objects.equals(this.subStatus, otherJob.subStatus) &&
-          Objects.equals(this.keysProcessedPct, otherJob.keysProcessedPct);
+          Objects.equals(this.keysProcessedPct, otherJob.keysProcessedPct) &&
+          Objects.equals(this.largestEntryKey, otherJob.largestEntryKey);
     }
     return false;
   }
@@ -255,7 +267,7 @@ public boolean equals(Object other) {
   public int hashCode() {
     return Objects.hash(creationTime, jobId, status, volume, bucket,
         fromSnapshot, toSnapshot, forceFullDiff, disableNativeDiff,
-        totalDiffEntries, reason, subStatus, keysProcessedPct);
+        totalDiffEntries, reason, subStatus, keysProcessedPct, 
largestEntryKey);
   }
 
   public SnapshotDiffJobProto toProtoBuf() {
@@ -276,6 +288,10 @@ public SnapshotDiffJobProto toProtoBuf() {
     if (subStatus != null) {
       builder.setSubStatus(subStatus.toProtoBuf());
     }
+    if (largestEntryKey != null) {
+      builder.setLargestEntryKey(largestEntryKey);
+    }
+
     return builder.build();
   }
 
@@ -285,6 +301,7 @@ public static SnapshotDiffJob getFromProtoBuf(
         JobStatus.fromProtobuf(diffJobProto.getStatus()) : null;
     SubStatus subStatus = (diffJobProto.hasSubStatus()) ?
         SubStatus.fromProtoBuf(diffJobProto.getSubStatus()) : null;
+    String largestEntryKey = diffJobProto.hasLargestEntryKey() ? 
diffJobProto.getLargestEntryKey() : null;
     return new SnapshotDiffJob(
         diffJobProto.getCreationTime(),
         diffJobProto.getJobId(),
@@ -297,7 +314,8 @@ public static SnapshotDiffJob getFromProtoBuf(
         diffJobProto.getDisableNativeDiff(),
         diffJobProto.getTotalDiffEntries(),
         subStatus,
-        diffJobProto.getKeysProcessedPct());
+        diffJobProto.getKeysProcessedPct(),
+        largestEntryKey);
   }
 
   /**
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotDiffJobCodec.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotDiffJobCodec.java
index 6e86a73d472..b24d225d0cc 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotDiffJobCodec.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotDiffJobCodec.java
@@ -47,8 +47,7 @@ public void testOldJsonSerializedDataCanBeReadByNewCodec() 
throws Exception {
         false,
         100L,
         SubStatus.SST_FILE_DELTA_DAG_WALK,
-        0.0
-    );
+        0.0, "jobKey");
 
     // Step 2: Serialize using the old Jackson-based codec
     byte[] oldFormatData = oldCodec.toPersistedFormatImpl(original);
@@ -67,6 +66,7 @@ public void testOldJsonSerializedDataCanBeReadByNewCodec() 
throws Exception {
     assertEquals(original.isNativeDiffDisabled(), 
parsed.isNativeDiffDisabled());
     assertEquals(original.getSubStatus(), parsed.getSubStatus());
     assertEquals(original.getTotalDiffEntries(), parsed.getTotalDiffEntries());
+    assertEquals(original.getLargestEntryKey(), parsed.getLargestEntryKey());
 
     assertEquals(0.0, parsed.getKeysProcessedPct());
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
index 5f14451fad3..ccbf97f3c95 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
@@ -1442,7 +1442,7 @@ public void testSnapDiff() throws Exception {
 
     IOException ioException = assertThrows(IOException.class,
         () -> store.snapshotDiff(volume, bucket, snap6,
-            snap7, "3", 0, forceFullSnapshotDiff, disableNativeDiff));
+            snap7, "400000000000000000003", 0, forceFullSnapshotDiff, 
disableNativeDiff));
     assertThat(ioException.getMessage()).contains("Index (given: 3) " +
         "should be a number >= 0 and < totalDiffEntries: 2. Page size " +
         "(given: 1000) should be a positive number > 0.");
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 94399e942c2..b6bed6b0198 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -917,6 +917,7 @@ message SnapshotDiffJobProto {
   optional bool disableNativeDiff = 11;
   optional SnapshotDiffResponse.SubStatus subStatus = 12;
   optional double keysProcessedPct = 13;
+  optional string largestEntryKey = 14;
 }
 
 message OzoneObj {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 1608d1a8cef..56603a9f207 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -845,7 +845,7 @@ public SnapshotDiffResponse getSnapshotDiffReport(final 
String volume,
       return new SnapshotDiffResponse(diffReport, DONE, 0L);
     }
 
-    int index = getIndexFromToken(token);
+    String index = validateToken(token);
     if (pageSize <= 0 || pageSize > maxPageSize) {
       pageSize = maxPageSize;
     }
@@ -969,24 +969,18 @@ public int getInFlightSnapshotCount() {
     return inFlightSnapshotCount.get();
   }
 
-  private int getIndexFromToken(final String token) throws IOException {
+  private String validateToken(final String token) throws IOException {
     if (isBlank(token)) {
-      return 0;
+      return "";
     }
 
-    // Validate that token passed in the request is valid integer as of now.
-    // Later we can change it if we migrate to encrypted or cursor token.
-    try {
-      int index = Integer.parseInt(token);
-      if (index < 0) {
-        throw new IOException("Passed token is invalid. Resend the request " +
-            "with valid token returned in previous request.");
-      }
-      return index;
-    } catch (NumberFormatException exception) {
+    // Validate that the token passed in the request matches the expected 
format:
+    // <diffTypePrefix><20-digit-padded-index>. Update this logic if the token 
format changes in the future.
+    if (!token.matches("[0-9]+")) {
       throw new IOException("Passed token is invalid. " +
           "Resend the request with valid token returned in previous request.");
     }
+    return token;
   }
 
   private ManagedRocksDB createRocksDbForSnapshotDiff(
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
index e73d5ef4253..d4d759a4e4e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
@@ -195,14 +196,9 @@ private void removeOlderJobReport() {
 
         if (totalNumberOfEntries > 0) {
           byte[] beginKey = codecRegistry.asRawData(prefix + DELIMITER + 0);
-          byte[] endKey = codecRegistry.asRawData(prefix + DELIMITER +
-              (totalNumberOfEntries - 1));
+          byte[] endKey = 
codecRegistry.asRawData(StringUtils.getLexicographicallyHigherString(prefix + 
DELIMITER));
           // Delete Range excludes the endKey.
-          // Hence, we do two delete,
-          //  1. deleteRange form beginKey(included) to endKey(excluded).
-          //  2. delete endKey.
           writeBatch.deleteRange(snapDiffReportCfh, beginKey, endKey);
-          writeBatch.delete(snapDiffReportCfh, endKey);
         }
         // Finally, remove the entry from the purged job table.
         writeBatch.delete(snapDiffPurgedJobCfh, key);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 2147fc3ec18..9dbf546f18d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.om.snapshot;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.leftPad;
 import static 
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
 import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.CREATE;
@@ -49,6 +50,7 @@
 import static 
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_JOB_NOT_EXIST;
 import static 
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_NON_CANCELLABLE;
 import static 
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_SUCCEEDED;
+import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone.getDiffReportEntry;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.CANCELLED;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.FAILED;
@@ -60,6 +62,7 @@
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_OBS;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import jakarta.annotation.Nonnull;
@@ -74,6 +77,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -100,11 +104,13 @@
 import org.apache.hadoop.hdds.utils.db.ManagedRawSSTFileReader;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.SstFileSetReader;
+import org.apache.hadoop.hdds.utils.db.StringCodec;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.ozone.OFSPath;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -142,11 +148,10 @@
  * Class to generate snapshot diff.
  */
 public class SnapshotDiffManager implements AutoCloseable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotDiffManager.class);
-  private static final String DELETE_DIFF_TABLE_SUFFIX = "-delete-diff";
-  private static final String RENAME_DIFF_TABLE_SUFFIX = "-rename-diff";
-  private static final String CREATE_DIFF_TABLE_SUFFIX = "-create-diff";
-  private static final String MODIFY_DIFF_TABLE_SUFFIX = "-modify-diff";
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SnapshotDiffManager.class);
+  private static final Map<DiffType, String> DIFF_TYPE_STRING_MAP =
+      new EnumMap<>(ImmutableMap.of(DELETE, "1", RENAME, "2", CREATE, "3", 
MODIFY, "4"));
 
   private final ManagedRocksDB db;
   private final OzoneManager ozoneManager;
@@ -331,12 +336,49 @@ private void createEmptySnapDiffDir(Path path) {
     }
   }
 
+  static String getReportKeyForIndex(String jobId, String index) {
+    return jobId + DELIMITER + index;
+  }
+
+  static String getIndexFromReportKey(String reportKey) {
+    return reportKey.substring(reportKey.lastIndexOf(DELIMITER) + 1);
+  }
+
   /**
    * Gets the report key for a particular index of snapshot diff job.
+   * The order in which snap-diff should be applied
+   *   1. Delete diffs
+   *   2. Rename diffs
+   *   3. Create diffs
+   *   4. Modified diffs
+   *
+   * Consider the following scenario
+   *
+   * 1. File "A" is created.
+   *   2. File "B" is created.
+   *   3. File "C" is created.
+   *   Snapshot "1" is taken.
+   *
+   * Case 1:
+   *   1. File "A" is deleted.
+   *   2. File "B" is renamed to "A".
+   *   Snapshot "2" is taken.
+   *
+   *   Snapshot diff should be applied in the following order:
+   *     1. Delete "A"
+   *     2. Rename "B" to "A"
+   *
+   * Case 2:
+   *   1. File "B" is renamed to "C".
+   *   2. File "B" is created.
+   *   Snapshot "2" is taken.
+   *
+   *   Snapshot diff should be applied in the following order:
+   *     1. Rename "B" to "C"
+   *     2. Create "B"
    */
-
-  static String getReportKeyForIndex(String jobId, long index) {
-    return jobId + DELIMITER + leftPad(String.valueOf(index), 20, '0');
+  static String getReportKeyForIndex(String jobId, DiffType diffType, long 
index) {
+    return getReportKeyForIndex(jobId, DIFF_TYPE_STRING_MAP.get(diffType) + 
leftPad(String.valueOf(index), 20, '0'));
   }
 
   public CancelSnapshotDiffResponse cancelSnapshotDiff(
@@ -442,7 +484,7 @@ public SnapshotDiffResponse getSnapshotDiffReport(
       final String bucketName,
       final String fromSnapshotName,
       final String toSnapshotName,
-      final int index,
+      final String index,
       final int pageSize,
       final boolean forceFullDiff,
       final boolean disableNativeDiff
@@ -521,15 +563,33 @@ SnapshotDiffReportOzone createPageResponse(
       final String bucketName,
       final String fromSnapshotName,
       final String toSnapshotName,
-      final int index,
+      final String index,
       final int pageSize
   ) throws IOException {
-    if (index < 0 || index > snapDiffJob.getTotalDiffEntries()
-        || pageSize <= 0) {
+    if (!isBlank(index)) {
+      
DIFF_TYPE_STRING_MAP.values().stream().filter(index::startsWith).findFirst()
+          .orElseThrow(() -> new IOException("Token " + index + " has invalid 
prefix. Valid prefixes: "
+              + 
DIFF_TYPE_STRING_MAP.values().stream().map(String::valueOf).collect(Collectors.joining(","))));
+    }
+
+    int idx;
+    if (isBlank(index)) {
+      idx = 0;
+    } else {
+      try {
+        idx = Integer.parseInt(index.substring(1));
+      } catch (NumberFormatException e) {
+        throw new IOException("Token " + index + " has invalid numeric part: " 
+
+            index.substring(1) + ". It should be a valid integer.", e);
+      }
+    }
+    if (idx < 0 ||
+        (snapDiffJob.getTotalDiffEntries() > 0 && idx >= 
snapDiffJob.getTotalDiffEntries()) ||
+        pageSize <= 0) {
       throw new IOException(String.format(
           "Index (given: %d) should be a number >= 0 and < totalDiffEntries: " 
+
               "%d. Page size (given: %d) should be a positive number > 0.",
-          index, snapDiffJob.getTotalDiffEntries(), pageSize));
+          idx, snapDiffJob.getTotalDiffEntries(), pageSize));
     }
 
     OFSPath path = getSnapshotRootPath(volumeName, bucketName);
@@ -545,39 +605,37 @@ SnapshotDiffReportOzone createPageResponse(
 
   Pair<List<DiffReportEntry>, String> createPageResponse(
       final SnapshotDiffJob snapDiffJob,
-      final int index,
+      final String index,
       final int pageSize
   ) throws IOException {
     List<DiffReportEntry> diffReportList = new ArrayList<>();
 
-    boolean hasMoreEntries = true;
-
     byte[] lowerIndex = codecRegistry.asRawData(getReportKeyForIndex(
         snapDiffJob.getJobId(), index));
-    byte[] upperIndex = codecRegistry.asRawData(getReportKeyForIndex(
-        snapDiffJob.getJobId(), index + pageSize));
-    int idx = index;
+    String highestPossiblePrefix =
+        
StringUtils.getLexicographicallyHigherString(DIFF_TYPE_STRING_MAP.values().stream()
+            .max(String::compareTo).get());
+    byte[] upperIndex = 
codecRegistry.asRawData(getReportKeyForIndex(snapDiffJob.getJobId(), 
highestPossiblePrefix));
     try (ClosableIterator<Map.Entry<byte[], byte[]>> iterator =
-             snapDiffReportTable.iterator(Optional.of(lowerIndex),
-                 Optional.of(upperIndex))) {
+             snapDiffReportTable.iterator(Optional.of(lowerIndex), 
Optional.of(upperIndex))) {
       int itemsFetched = 0;
+      String pageLastKey = "";
       while (iterator.hasNext() && itemsFetched < pageSize) {
         Map.Entry<byte[], byte[]> entry = iterator.next();
+        pageLastKey = StringCodec.get().fromPersistedFormat(entry.getKey());
         byte[] bytes = entry.getValue();
-        diffReportList.add(codecRegistry.asObject(bytes,
-            DiffReportEntry.class));
-        idx += 1;
+        diffReportList.add(codecRegistry.asObject(bytes, 
DiffReportEntry.class));
         itemsFetched += 1;
       }
-      if (diffReportList.size() < pageSize) {
-        hasMoreEntries = false;
-      }
+      // Next token
+      String nextTokenString = iterator.hasNext() ?
+          
getIndexFromReportKey(StringCodec.get().fromPersistedFormat(iterator.next().getKey()))
 : null;
+
+      checkReportsIntegrity(snapDiffJob, pageLastKey, nextTokenString == null);
+      return Pair.of(diffReportList, nextTokenString);
     }
 
-    String nextTokenString = hasMoreEntries ? String.valueOf(idx) : null;
 
-    checkReportsIntegrity(snapDiffJob, index, diffReportList.size());
-    return Pair.of(diffReportList, nextTokenString);
   }
 
   /**
@@ -588,16 +646,12 @@ Pair<List<DiffReportEntry>, String> createPageResponse(
    */
   @VisibleForTesting
   void checkReportsIntegrity(final SnapshotDiffJob diffJob,
-                             final int pageStartIdx,
-                             final int numberOfEntriesInPage)
-      throws IOException {
-    if ((pageStartIdx >= diffJob.getTotalDiffEntries() &&
-        numberOfEntriesInPage != 0) || (pageStartIdx <
-        diffJob.getTotalDiffEntries() && numberOfEntriesInPage == 0)) {
-      LOG.error("Expected TotalDiffEntries: {} but found " +
-              "TotalDiffEntries: {}",
-          diffJob.getTotalDiffEntries(),
-          pageStartIdx + numberOfEntriesInPage);
+                             final String largestPageIndex,
+                             boolean lastPage) throws IOException {
+    // For last page check last entry returned if the largest entry key equals 
the largest key stored in the job entry.
+    if (lastPage && (diffJob.getLargestEntryKey() == null || 
!diffJob.getLargestEntryKey().equals(largestPageIndex))) {
+      LOG.error("Expected last entry: {} but found " +
+              "Largest Page entry: {}", diffJob.getLargestEntryKey(), 
largestPageIndex);
       updateJobStatus(diffJob.getJobId(), DONE, FAILED);
       throw new IOException("Report integrity check failed. Retry after: " +
           ozoneManager.getOmSnapshotManager().getDiffCleanupServiceInterval());
@@ -611,7 +665,7 @@ private synchronized SnapshotDiffResponse submitSnapDiffJob(
       final String bucket,
       final String fromSnapshot,
       final String toSnapshot,
-      final int index,
+      final String index,
       final int pageSize,
       final boolean forceFullDiff,
       final boolean disableNativeDiff
@@ -731,7 +785,7 @@ private synchronized SnapshotDiffJob 
getSnapDiffReportStatus(
       String jobId = UUID.randomUUID().toString();
       snapDiffJob = new SnapshotDiffJob(System.currentTimeMillis(), jobId,
           QUEUED, volumeName, bucketName, fromSnapshotName, toSnapshotName, 
forceFullDiff,
-          disableNativeDiff, 0L, null, 0.0);
+          disableNativeDiff, 0L, null, 0.0, null);
       snapDiffJobTable.put(jobKey, snapDiffJob);
     }
 
@@ -920,7 +974,7 @@ void generateSnapshotDiffReport(final String jobKey,
           },
           () -> {
             recordActivity(jobKey, DIFF_REPORT_GEN);
-            long totalDiffEntries = generateDiffReport(jobId,
+            Pair<Long, String> reportEntries = generateDiffReport(jobId,
                 fsKeyTable,
                 tsKeyTable,
                 fsDirTable,
@@ -933,10 +987,10 @@ void generateSnapshotDiffReport(final String jobKey,
                 bucketLayout.isFileSystemOptimized(), oldParentIdPathMap,
                 newParentIdPathMap, tablePrefixes);
             // If job is cancelled, totalDiffEntries will be equal to -1.
-            if (totalDiffEntries >= 0 &&
+            if (reportEntries.getKey() >= 0 &&
                 areDiffJobAndSnapshotsActive(volumeName, bucketName,
                     fromSnapshotName, toSnapshotName)) {
-              updateJobStatusToDone(jobKey, totalDiffEntries);
+              updateJobStatusToDone(jobKey, reportEntries.getKey(), 
reportEntries.getValue());
             }
             return null;
           }
@@ -1125,7 +1179,7 @@ private String resolveBucketRelativePath(boolean 
isFSOBucket,
   }
 
   @SuppressWarnings({"checkstyle:ParameterNumber", "checkstyle:MethodLength"})
-  long generateDiffReport(
+  Pair<Long, String> generateDiffReport(
       final String jobId,
       final Table<String, OmKeyInfo> fsTable,
       final Table<String, OmKeyInfo> tsTable,
@@ -1143,43 +1197,21 @@ long generateDiffReport(
       final Optional<Map<Long, Path>> newParentIdPathMap,
       final TablePrefixInfo tablePrefix) {
     LOG.info("Starting diff report generation for jobId: {}.", jobId);
-    ColumnFamilyHandle deleteDiffColumnFamily = null;
-    ColumnFamilyHandle renameDiffColumnFamily = null;
-    ColumnFamilyHandle createDiffColumnFamily = null;
-    ColumnFamilyHandle modifyDiffColumnFamily = null;
 
     // JobId is prepended to column family name to make it unique for request.
     try {
-      deleteDiffColumnFamily =
-          createColumnFamily(jobId + DELETE_DIFF_TABLE_SUFFIX);
-      renameDiffColumnFamily =
-          createColumnFamily(jobId + RENAME_DIFF_TABLE_SUFFIX);
-      createDiffColumnFamily =
-          createColumnFamily(jobId + CREATE_DIFF_TABLE_SUFFIX);
-      modifyDiffColumnFamily =
-          createColumnFamily(jobId + MODIFY_DIFF_TABLE_SUFFIX);
-
-      // Keep byte array instead of storing as DiffReportEntry to avoid
-      // unnecessary serialization and deserialization.
-      final PersistentList<byte[]> deleteDiffs =
-          createDiffReportPersistentList(deleteDiffColumnFamily);
-      final PersistentList<byte[]> renameDiffs =
-          createDiffReportPersistentList(renameDiffColumnFamily);
-      final PersistentList<byte[]> createDiffs =
-          createDiffReportPersistentList(createDiffColumnFamily);
-      final PersistentList<byte[]> modifyDiffs =
-          createDiffReportPersistentList(modifyDiffColumnFamily);
-
       try (ClosableIterator<Map.Entry<byte[], Boolean>>
                iterator = objectIdToIsDirMap.iterator()) {
         // This counter is used, so that we can check every 100 elements
         // if the job is cancelled and snapshots are still active.
         int counter = 0;
+        long index = 0;
+        String largestJobKey = "";
         while (iterator.hasNext()) {
           if (counter % 100 == 0 &&
               !areDiffJobAndSnapshotsActive(volumeName, bucketName,
                   fromSnapshotName, toSnapshotName)) {
-            return -1L;
+            return Pair.of(-1L, null);
           }
 
           Map.Entry<byte[], Boolean> nextEntry = iterator.next();
@@ -1201,7 +1233,7 @@ long generateDiffReport(
            */
           byte[] oldKeyName = oldObjIdToKeyMap.get(id);
           byte[] newKeyName = newObjIdToKeyMap.get(id);
-
+          String jobKey = "";
           if (oldKeyName == null && newKeyName == null) {
             // This cannot happen.
             throw new IllegalStateException(
@@ -1210,26 +1242,23 @@ long generateDiffReport(
             String key = resolveBucketRelativePath(isFSOBucket,
                 newParentIdPathMap, newKeyName, true);
             if (key != null) {
-              DiffReportEntry entry =
-                  SnapshotDiffReportOzone.getDiffReportEntry(CREATE, key);
-              createDiffs.add(codecRegistry.asRawData(entry));
+              DiffReportEntry entry = getDiffReportEntry(CREATE, key);
+              jobKey = addToReport(jobId, index++, entry);
             }
           } else if (newKeyName == null) { // Key Deleted.
             String key = resolveBucketRelativePath(isFSOBucket,
                 oldParentIdPathMap, oldKeyName, true);
             if (key != null) {
-              DiffReportEntry entry =
-                  SnapshotDiffReportOzone.getDiffReportEntry(DELETE, key);
-              deleteDiffs.add(codecRegistry.asRawData(entry));
+              DiffReportEntry entry = getDiffReportEntry(DELETE, key);
+              jobKey = addToReport(jobId, index++, entry);
             }
           } else if (isDirectoryObject &&
               Arrays.equals(oldKeyName, newKeyName)) {
             String key = resolveBucketRelativePath(isFSOBucket,
                 newParentIdPathMap, newKeyName, true);
             if (key != null) {
-              DiffReportEntry entry =
-                  SnapshotDiffReportOzone.getDiffReportEntry(MODIFY, key);
-              modifyDiffs.add(codecRegistry.asRawData(entry));
+              DiffReportEntry entry = getDiffReportEntry(MODIFY, key);
+              jobKey = addToReport(jobId, index++, entry);
             }
           } else {
             String keyPrefix = tablePrefix.getTablePrefix((isDirectoryObject ? 
fsDirTable : fsTable).getName());
@@ -1246,8 +1275,7 @@ long generateDiffReport(
                       "oldKey is null for oldKey : %s newKey: %s", 
codecRegistry.asObject(oldKeyName, String.class),
                   codecRegistry.asObject(newKeyName, String.class)));
             } else if (newKey == null) {
-              deleteDiffs.add(codecRegistry.asRawData(SnapshotDiffReportOzone
-                  .getDiffReportEntry(DELETE, oldKey)));
+              jobKey = addToReport(jobId, index++, getDiffReportEntry(DELETE, 
oldKey));
             } else {
               // Check if block location is same or not. If it is not same,
               // key must have been overridden as well.
@@ -1256,72 +1284,29 @@ long generateDiffReport(
                   keyPrefix + codecRegistry.asObject(newKeyName, String.class),
                   isDirectoryObject ? fsDirTable : fsTable,
                   isDirectoryObject ? tsDirTable : tsTable);
+              if (!isObjectModified || !Arrays.equals(oldKeyName, newKeyName)) 
{
+                jobKey = addToReport(jobId, index++, 
getDiffReportEntry(RENAME, oldKey, newKey));
+              }
               if (isObjectModified) {
                 // Here, oldKey name is returned as modified. Modified key name
                 // is based on base snapshot (from snapshot).
-                modifyDiffs.add(codecRegistry.asRawData(SnapshotDiffReportOzone
-                    .getDiffReportEntry(MODIFY, oldKey)));
-              }
-              if (!isObjectModified || !Arrays.equals(oldKeyName, newKeyName)) 
{
-                renameDiffs.add(codecRegistry.asRawData(
-                    SnapshotDiffReportOzone.getDiffReportEntry(RENAME, oldKey,
-                        newKey)));
+                String modifiedJobKey = addToReport(jobId, index++, 
getDiffReportEntry(MODIFY, oldKey));
+                if (modifiedJobKey.compareTo(jobKey) > 0) {
+                  jobKey = modifiedJobKey;
+                }
               }
             }
           }
+          if (jobKey.compareTo(largestJobKey) > 0) {
+            largestJobKey = jobKey;
+          }
           counter++;
         }
+        return Pair.of(index, largestJobKey);
       }
-
-      /*
-       * The order in which snap-diff should be applied
-       *
-       *     1. Delete diffs
-       *     2. Rename diffs
-       *     3. Create diffs
-       *     4. Modified diffs
-       *
-       * Consider the following scenario
-       *
-       *    1. File "A" is created.
-       *    2. File "B" is created.
-       *    3. File "C" is created.
-       *    Snapshot "1" is taken.
-       *
-       * Case 1:
-       *   1. File "A" is deleted.
-       *   2. File "B" is renamed to "A".
-       *   Snapshot "2" is taken.
-       *
-       *   Snapshot diff should be applied in the following order:
-       *    1. Delete "A"
-       *    2. Rename "B" to "A"
-       *
-       *
-       * Case 2:
-       *    1. File "B" is renamed to "C".
-       *    2. File "B" is created.
-       *    Snapshot "2" is taken.
-       *
-       *   Snapshot diff should be applied in the following order:
-       *    1. Rename "B" to "C"
-       *    2. Create "B"
-       *
-       */
-
-      long index = 0;
-      index = addToReport(jobId, index, deleteDiffs);
-      index = addToReport(jobId, index, renameDiffs);
-      index = addToReport(jobId, index, createDiffs);
-      return addToReport(jobId, index, modifyDiffs);
-    } catch (RocksDBException | IOException e) {
+    } catch (IOException e) {
       // TODO: [SNAPSHOT] Fail gracefully.
       throw new RuntimeException(e);
-    } finally {
-      dropAndCloseColumnFamilyHandle(deleteDiffColumnFamily);
-      dropAndCloseColumnFamilyHandle(renameDiffColumnFamily);
-      dropAndCloseColumnFamilyHandle(createDiffColumnFamily);
-      dropAndCloseColumnFamilyHandle(modifyDiffColumnFamily);
     }
   }
 
@@ -1365,15 +1350,6 @@ private boolean areAclsSame(OmDirectoryInfo fromObject,
     return fromObject.getAcls().equals(toObject.getAcls());
   }
 
-  private PersistentList<byte[]> createDiffReportPersistentList(
-      ColumnFamilyHandle columnFamilyHandle
-  ) {
-    return new RocksDbPersistentList<>(db,
-        columnFamilyHandle,
-        codecRegistry,
-        byte[].class);
-  }
-
   private ColumnFamilyHandle createColumnFamily(String columnFamilyName)
       throws RocksDBException {
     return db.get().createColumnFamily(
@@ -1382,18 +1358,10 @@ private ColumnFamilyHandle createColumnFamily(String 
columnFamilyName)
             familyOptions));
   }
 
-  private long addToReport(String jobId, long index,
-                           PersistentList<byte[]> diffReportEntries)
-      throws IOException {
-    try (ClosableIterator<byte[]>
-             diffReportIterator = diffReportEntries.iterator()) {
-      while (diffReportIterator.hasNext()) {
-        snapDiffReportTable.put(codecRegistry.asRawData(
-            getReportKeyForIndex(jobId, index)), diffReportIterator.next());
-        index++;
-      }
-    }
-    return index;
+  private String addToReport(String jobId, long index, DiffReportEntry 
diffReportEntry) throws IOException {
+    String jobReportKey = getReportKeyForIndex(jobId, 
diffReportEntry.getType(), index);
+    snapDiffReportTable.put(codecRegistry.asRawData(jobReportKey), 
codecRegistry.asRawData(diffReportEntry));
+    return jobReportKey;
   }
 
   private void dropAndCloseColumnFamilyHandle(
@@ -1462,7 +1430,8 @@ private synchronized void updateJobStatusToFailed(String 
jobKey,
   }
 
   private synchronized void updateJobStatusToDone(String jobKey,
-                                                  long totalNumberOfEntries) {
+                                                  long totalDiffEntries,
+                                                  String largestJobKey) {
     SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey);
     if (snapshotDiffJob.getStatus() != IN_PROGRESS) {
       throw new IllegalStateException("Invalid job status for jobID: " +
@@ -1472,7 +1441,8 @@ private synchronized void updateJobStatusToDone(String 
jobKey,
     }
 
     snapshotDiffJob.setStatus(DONE);
-    snapshotDiffJob.setTotalDiffEntries(totalNumberOfEntries);
+    snapshotDiffJob.setTotalDiffEntries(totalDiffEntries);
+    snapshotDiffJob.setLargestEntryKey(largestJobKey);
     snapDiffJobTable.put(jobKey, snapshotDiffJob);
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
index 98e7ac62cf9..2dc9329601a 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDiffCleanupService.java
@@ -298,7 +298,7 @@ private SnapshotDiffJob addJobAndReport(JobStatus jobStatus,
 
     SnapshotDiffJob job = new SnapshotDiffJob(creationTime, jobId, jobStatus,
         volume, bucket, fromSnapshot, toSnapshot, false, false, noOfEntries,
-        null, 0.0);
+        null, 0.0, jobId + "-" + noOfEntries);
 
     db.get().put(jobTableCfh, codecRegistry.asRawData(jobKey),
         codecRegistry.asRawData(job));
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java
index 1b01c23afd4..9acf8e6b4f7 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/SnapshotTestUtils.java
@@ -17,8 +17,10 @@
 
 package org.apache.hadoop.ozone.om.snapshot;
 
+import com.google.common.primitives.UnsignedBytes;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
@@ -26,6 +28,7 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import org.apache.hadoop.hdds.utils.db.CodecRegistry;
 import org.apache.hadoop.ozone.util.ClosableIterator;
 
 /**
@@ -33,27 +36,18 @@
  */
 public class SnapshotTestUtils {
 
-  private static <K> String getStringKey(K key) {
-    if (key.getClass().isArray()) {
-      Class<?> componentType = key.getClass().getComponentType();
-      if (componentType == byte.class) {
-        return Arrays.toString((byte[])key);
-      } else if (componentType == int.class) {
-        return Arrays.toString((int[])key);
-      } else if (componentType == long.class) {
-        return Arrays.toString((long[])key);
-      } else if (componentType == float.class) {
-        return Arrays.toString((float[])key);
-      } else if (componentType == double.class) {
-        return Arrays.toString((double[])key);
-      } else if (componentType == char.class) {
-        return Arrays.toString((char[])key);
-      } else {
-        return Arrays.toString((Object[])key);
-      }
+  private static final CodecRegistry CODEC_REGISTRY = 
CodecRegistry.newBuilder().build();
+
+  private static final Comparator<byte[]> UNSIGNED_BYTE_COMPARATOR = 
UnsignedBytes.lexicographicalComparator();
+  private static final Comparator<Object> COMPARATOR = (k1, k2) -> {
+    try {
+      byte[] k1Bytes = CODEC_REGISTRY.asRawData(k1);
+      byte[] k2Bytes = CODEC_REGISTRY.asRawData(k2);
+      return UNSIGNED_BYTE_COMPARATOR.compare(k1Bytes, k2Bytes);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
     }
-    return key.toString();
-  }
+  };
 
   /**
    * Stubbed implementation of CloseableIterator containing iterators.
@@ -92,13 +86,12 @@ public static class StubbedPersistentMap<K, V> implements
 
     public StubbedPersistentMap(Map<K, V> map) {
       this();
-      map.entrySet().iterator().forEachRemaining(i ->
-          this.put(i.getKey(), i.getValue()));
+      this.map.putAll(map);
     }
 
     public StubbedPersistentMap() {
-      this.map = new TreeMap<>(
-          Comparator.comparing(SnapshotTestUtils::getStringKey));
+
+      this.map = new TreeMap<>(COMPARATOR);
     }
 
     @Override
@@ -142,8 +135,7 @@ public StubbedPersistentSet(Set<K> map) {
     }
 
     public StubbedPersistentSet() {
-      this.set = new TreeSet<>(
-          Comparator.comparing(SnapshotTestUtils::getStringKey));
+      this.set = new TreeSet<>(COMPARATOR);
     }
 
     @Override
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index 5a82c3b1591..6327b712dee 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.om.snapshot;
 
+import static org.apache.commons.lang3.StringUtils.leftPad;
 import static 
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB_DEFAULT;
@@ -95,12 +96,14 @@
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -279,7 +282,7 @@ public void init() throws RocksDBException, IOException, 
ExecutionException {
       SnapshotDiffJob diffJob = new SnapshotDiffJob(System.currentTimeMillis(),
           UUID.randomUUID().toString(), jobStatus, VOLUME_NAME, BUCKET_NAME,
           baseSnapshotName, targetSnapshotName, false, false, 0,
-          null, 0.0);
+          null, 0.0, "");
 
       snapshotNames.add(targetSnapshotName);
       snapshotInfoList.add(targetSnapshot);
@@ -538,6 +541,8 @@ public void testGenerateDiffReport() throws IOException {
     PersistentMap<byte[], Boolean> objectIdToIsDirMap =
         new SnapshotTestUtils.StubbedPersistentMap<>();
     Map<Long, SnapshotDiffReport.DiffType> diffMap = new HashMap<>();
+    AtomicInteger count = new AtomicInteger(0);
+    AtomicInteger maxIndex = new AtomicInteger(0);
     LongStream.range(0, 100).forEach(objectId -> {
       try {
         String key = "key" + objectId;
@@ -557,20 +562,26 @@ public void testGenerateDiffReport() throws IOException {
           byte[] renamedKeyBytes = codecRegistry.asRawData(renamedKey);
           newObjectIdKeyMap.put(objectIdVal, renamedKeyBytes);
           diffMap.put(objectId, SnapshotDiffReport.DiffType.RENAME);
+          count.getAndIncrement();
         }
         objectIdToIsDirMap.put(objectIdVal, false);
         if (objectId >= 50 && objectId <= 100 ||
             objectId >= 0 && objectId <= 25 && objectId % 4 > 2) {
           diffMap.put(objectId, SnapshotDiffReport.DiffType.DELETE);
+          count.getAndIncrement();
         }
         if (objectId >= 0 && objectId <= 25 && objectId % 4 == 0) {
           diffMap.put(objectId, SnapshotDiffReport.DiffType.RENAME);
-        }
-        if (objectId >= 0 && objectId <= 25 && objectId % 4 == 2) {
-          diffMap.put(objectId, SnapshotDiffReport.DiffType.MODIFY);
+          count.getAndIncrement();
         }
         if (objectId > 25 && objectId < 50) {
           diffMap.put(objectId, SnapshotDiffReport.DiffType.CREATE);
+          count.getAndIncrement();
+        }
+        if (objectId >= 0 && objectId <= 25 && objectId % 4 == 2) {
+          diffMap.put(objectId, SnapshotDiffReport.DiffType.MODIFY);
+          maxIndex.set(count.get());
+          count.getAndIncrement();
         }
       } catch (IOException e) {
         throw new RuntimeException(e);
@@ -622,20 +633,20 @@ public void testGenerateDiffReport() throws IOException {
           .areDiffJobAndSnapshotsActive(volumeName, bucketName, fromSnapName,
               toSnapName);
 
-      long totalDiffEntries = spy.generateDiffReport("jobId",
+      Pair<Long, String> totalDiffEntries = spy.generateDiffReport("jobId",
           fromSnapTable, toSnapTable, null, null,
           objectIdToIsDirMap, oldObjectIdKeyMap, newObjectIdKeyMap,
           volumeName, bucketName, fromSnapName, toSnapName, false,
           Optional.empty(), Optional.empty(), tablePrefixes);
 
-      assertEquals(100, totalDiffEntries);
+      assertEquals(100, totalDiffEntries.getKey());
       SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, "jobId",
           JobStatus.DONE, "vol", "buck", "fs", "ts", false,
-          true, diffMap.size(), null, 0.0);
+          true, diffMap.size(), null, 0.0, "jobId-4" + 
leftPad(String.valueOf(maxIndex.get()), 20, '0'));
       SnapshotDiffReportOzone snapshotDiffReportOzone =
           snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
               "buck", "fs", "ts",
-              0, Integer.MAX_VALUE);
+              "", Integer.MAX_VALUE);
       Set<SnapshotDiffReport.DiffType> expectedOrder = new LinkedHashSet<>();
       expectedOrder.add(SnapshotDiffReport.DiffType.DELETE);
       expectedOrder.add(SnapshotDiffReport.DiffType.RENAME);
@@ -667,36 +678,52 @@ private DiffReportEntry getTestDiffEntry(String jobId,
    * objectId Map of diff keys to be checked with their corresponding key 
names.
    */
   @ParameterizedTest
-  @CsvSource({"0,10,1000", "1,10,8", "10,1000,10", "-1,1000,10000",
-      "1,0,1000", "1,-1,1000"})
+  @CsvSource({"0,10", "1,8", "2,8", "3,8"})
   public void testCreatePageResponse(int startIdx,
-                                     int pageSize,
                                      int totalNumberOfRecords)
       throws IOException, RocksDBException {
     String testJobId = "jobId";
     String testJobId2 = "jobId2";
+    Map<Integer, String> pageMap = new HashMap<>();
+    Map<Integer, SnapshotDiffReport.DiffType> typeMap = new HashMap<>();
+    List<Integer> indexList = IntStream.range(0, 
totalNumberOfRecords).boxed().collect(Collectors.toList());
 
-    IntStream.range(0, totalNumberOfRecords).boxed().forEach(idx -> {
+    for (int idx = 0; idx < totalNumberOfRecords; idx++) {
       try {
+        DiffReportEntry reportEntry = getTestDiffEntry(testJobId, idx);
+        String reportEntryKey = 
SnapshotDiffManager.getReportKeyForIndex(testJobId, reportEntry.getType(), idx);
+        typeMap.put(idx, reportEntry.getType());
+        pageMap.put(idx, reportEntryKey.split(DELIMITER)[1]);
+        String report2EntryKey = testJobId2 + DELIMITER + idx;
         db.get().put(snapDiffReportTable,
-            codecRegistry.asRawData(SnapshotDiffManager
-                .getReportKeyForIndex(testJobId, idx)),
-            codecRegistry.asRawData(getTestDiffEntry(testJobId, idx)));
+            codecRegistry.asRawData(reportEntryKey),
+            codecRegistry.asRawData(reportEntry));
         db.get().put(snapDiffReportTable,
-            codecRegistry.asRawData(testJobId2 + DELIMITER + idx),
+            codecRegistry.asRawData(report2EntryKey),
             codecRegistry.asRawData(getTestDiffEntry(testJobId2, idx)));
       } catch (IOException | RocksDBException e) {
         throw new RuntimeException(e);
       }
+    }
+    List<SnapshotDiffReport.DiffType> orderedList = 
Arrays.asList(SnapshotDiffReport.DiffType.DELETE,
+        SnapshotDiffReport.DiffType.RENAME, 
SnapshotDiffReport.DiffType.CREATE, SnapshotDiffReport.DiffType.MODIFY);
+
+    indexList.sort((idx1, idx2) -> {
+      if (typeMap.get(idx1) != typeMap.get(idx2)) {
+        return orderedList.indexOf(typeMap.get(idx1)) - 
orderedList.indexOf(typeMap.get(idx2));
+      }
+      return idx1 - idx2;
     });
 
+
     SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0, testJobId,
         JobStatus.DONE, "vol", "buck", "fs", "ts", false,
-        true, totalNumberOfRecords, null, 0.0);
+        true, totalNumberOfRecords, null, 0.0,
+        testJobId + DELIMITER + pageMap.get(indexList.get(indexList.size() - 
1)));
 
     SnapshotDiffJob snapshotDiffJob2 = new SnapshotDiffJob(0, testJobId2,
         JobStatus.DONE, "vol", "buck", "fs", "ts", false,
-        true, totalNumberOfRecords, null, 0.0);
+        true, totalNumberOfRecords, null, 0.0, "");
 
     db.get().put(snapDiffJobTable,
         codecRegistry.asRawData(testJobId),
@@ -705,26 +732,30 @@ public void testCreatePageResponse(int startIdx,
     db.get().put(snapDiffJobTable,
         codecRegistry.asRawData(testJobId2),
         codecRegistry.asRawData(snapshotDiffJob2));
+    for (int pageSize = -1; pageSize <= totalNumberOfRecords; pageSize++) {
+      if (pageSize <= 0) {
+        int finalPageSize = pageSize;
+        assertThrows(IOException.class,
+            () -> snapshotDiffManager.createPageResponse(snapshotDiffJob, 
"vol",
+                "buck", "fs", "ts", pageMap.get(startIdx), finalPageSize));
+        continue;
+      }
+      SnapshotDiffReportOzone snapshotDiffReportOzone =
+          snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
+              "buck", "fs", "ts",
+              pageMap.get(startIdx), pageSize);
+      int expectedTotalNumberOfRecords =
+          Math.max(Math.min(pageSize, totalNumberOfRecords - 
indexList.indexOf(startIdx)), 0);
+      assertEquals(snapshotDiffReportOzone.getDiffList().size(), 
expectedTotalNumberOfRecords);
+      int idx = indexList.indexOf(startIdx);
+      for (DiffReportEntry entry : snapshotDiffReportOzone.getDiffList()) {
+        assertEquals(getTestDiffEntry(testJobId, indexList.get(idx)), entry);
+        idx++;
+      }
 
-    if (pageSize <= 0 || startIdx < 0) {
-      assertThrows(IOException.class,
-          () -> snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
-              "buck", "fs", "ts", startIdx, pageSize));
-      return;
-    }
-    SnapshotDiffReportOzone snapshotDiffReportOzone =
-        snapshotDiffManager.createPageResponse(snapshotDiffJob, "vol",
-            "buck", "fs", "ts",
-            startIdx, pageSize);
-    int expectedTotalNumberOfRecords =
-        Math.max(Math.min(pageSize, totalNumberOfRecords - startIdx), 0);
-    assertEquals(snapshotDiffReportOzone.getDiffList().size(),
-        expectedTotalNumberOfRecords);
-
-    int idx = startIdx;
-    for (DiffReportEntry entry : snapshotDiffReportOzone.getDiffList()) {
-      assertEquals(getTestDiffEntry(testJobId, idx), entry);
-      idx++;
+      assertEquals(snapshotDiffReportOzone.getToken() == null ? null :
+              
Integer.parseInt(snapshotDiffReportOzone.getToken().substring(1)),
+          idx == indexList.size() ? null : indexList.get(idx));
     }
   }
 
@@ -766,7 +797,7 @@ public void testGetSnapshotDiffReportForCancelledJob() 
throws IOException {
     // Submit a new job.
     SnapshotDiffResponse snapshotDiffResponse =
         spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName,
-            toSnapshotName, 0, 0, false, false);
+            toSnapshotName, "10", 0, false, false);
 
     assertEquals(JobStatus.IN_PROGRESS,
         snapshotDiffResponse.getJobStatus());
@@ -778,7 +809,7 @@ public void testGetSnapshotDiffReportForCancelledJob() 
throws IOException {
     // Job status should be cancelled until the cleanup
     // service removes the job from the table.
     snapshotDiffResponse = spy.getSnapshotDiffReport(volumeName, bucketName,
-        fromSnapshotName, toSnapshotName, 0, 0, false, false);
+        fromSnapshotName, toSnapshotName, "10", 0, false, false);
 
     assertEquals(JobStatus.CANCELLED,
         snapshotDiffResponse.getJobStatus());
@@ -791,7 +822,7 @@ public void testGetSnapshotDiffReportForCancelledJob() 
throws IOException {
 
     // Response should still be cancelled.
     snapshotDiffResponse = spy.getSnapshotDiffReport(volumeName, bucketName,
-        fromSnapshotName, toSnapshotName, 0, 0, false, false);
+        fromSnapshotName, toSnapshotName, "100000000000000000000", 0, false, 
false);
 
     assertEquals(JobStatus.CANCELLED,
         snapshotDiffResponse.getJobStatus());
@@ -841,7 +872,7 @@ public void testSnapshotDiffCancelFailure(JobStatus 
jobStatus,
     String jobId = UUID.randomUUID().toString();
     SnapshotDiffJob snapshotDiffJob = new SnapshotDiffJob(0L,
         jobId, jobStatus, volumeName, bucketName, fromSnapshotName, 
toSnapshotName,
-        true, false, 10, null, 0.0);
+        true, false, 10, null, 0.0, null);
 
     snapDiffJobMap.put(diffJobKey, snapshotDiffJob);
 
@@ -932,7 +963,7 @@ public void testListSnapshotDiffJobs(String jobStatus,
     // SnapshotDiffReport
     SnapshotDiffResponse snapshotDiffResponse =
         spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName,
-            toSnapshotName, 0, 0, false, false);
+            toSnapshotName, "100000000000000000000", 0, false, false);
 
     assertEquals(SnapshotDiffResponse.JobStatus.IN_PROGRESS,
         snapshotDiffResponse.getJobStatus());
@@ -979,7 +1010,7 @@ public void testListSnapDiffWithInvalidStatus() throws 
IOException {
         eq(toSnapshotName), eq(false), eq(false));
 
     spy.getSnapshotDiffReport(volumeName, bucketName, fromSnapshotName,
-            toSnapshotName, 0, 0, false, false);
+            toSnapshotName, "10", 0, false, false);
 
     // Invalid status, without listAllStatus true, results in an exception.
     assertThrows(IOException.class, () -> snapshotDiffManager
@@ -994,7 +1025,7 @@ public void testGenerateDiffReportWhenThereInEntry() {
         new StubbedPersistentMap<>();
     PersistentMap<byte[], byte[]> newObjIdToKeyMap =
         new StubbedPersistentMap<>();
-    long totalDiffEntries = snapshotDiffManager.generateDiffReport("jobId",
+    Pair<Long, String> totalDiffEntries = 
snapshotDiffManager.generateDiffReport("jobId",
         keyInfoTable,
         keyInfoTable,
         null,
@@ -1011,7 +1042,7 @@ public void testGenerateDiffReportWhenThereInEntry() {
         Optional.empty(),
         new TablePrefixInfo(Collections.emptyMap()));
 
-    assertEquals(0, totalDiffEntries);
+    assertEquals(0, totalDiffEntries.getKey());
   }
 
   @Test
@@ -1231,7 +1262,7 @@ private SnapshotDiffResponse 
submitJob(SnapshotDiffManager diffManager,
                                          String toSnapshotName) {
     try {
       return diffManager.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME,
-          fromSnapshotName, toSnapshotName, 0, 1000, false, false);
+          fromSnapshotName, toSnapshotName, "100000000000000000000", 1000, 
false, false);
     } catch (IOException exception) {
       throw new RuntimeException(exception);
     }
@@ -1324,7 +1355,7 @@ public void testGetSnapshotDiffReportJob() throws 
Exception {
     for (int i = 0; i < snapshotInfoList.size(); i++) {
       SnapshotDiffResponse snapshotDiffReport =
           spy.getSnapshotDiffReport(VOLUME_NAME, BUCKET_NAME,
-              snapshotInfo.getName(), snapshotInfoList.get(i).getName(), 0,
+              snapshotInfo.getName(), snapshotInfoList.get(i).getName(), 
"100000000000000000000",
               1000, false, false);
       SnapshotDiffJob diffJob = snapDiffJobs.get(i);
       if (diffJob.getStatus() == QUEUED) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to