This is an automated email from the ASF dual-hosted git repository.

ferhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cde7cb7  HDDS-6147. Add ability in OM to get limited delta updates 
(#2956)
cde7cb7 is described below

commit cde7cb766199dcba53eef53b3c13c63771eb87ac
Author: Symious <[email protected]>
AuthorDate: Tue Jan 18 15:04:36 2022 +0800

    HDDS-6147. Add ability in OM to get limited delta updates (#2956)
---
 .../org/apache/hadoop/hdds/utils/db/DBStore.java   | 10 +++++++++
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  | 11 ++++++++++
 .../apache/hadoop/hdds/utils/db/TestRDBStore.java  | 24 ++++++++++++++++++++++
 .../src/main/proto/OmClientProtocol.proto          |  1 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  6 +++++-
 5 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index f0096ed..2ac2bdc 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -187,4 +187,14 @@ public interface DBStore extends AutoCloseable, 
BatchOperationHandler {
    */
   DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
       throws SequenceNumberNotFoundException;
+
+  /**
+   * Get limited data written to DB since a specific sequence number.
+   * @param sequenceNumber
+   * @param limitCount
+   * @return
+   * @throws SequenceNumberNotFoundException
+   */
+  DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
+      throws SequenceNumberNotFoundException;
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index b50b462..eb71ec1 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -382,7 +382,15 @@ public class RDBStore implements DBStore {
   @Override
   public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
       throws SequenceNumberNotFoundException {
+    return getUpdatesSince(sequenceNumber, Long.MAX_VALUE);
+  }
 
+  @Override
+  public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
+      throws SequenceNumberNotFoundException {
+    if (limitCount <= 0) {
+      throw new IllegalArgumentException("Illegal count for getUpdatesSince.");
+    }
     DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
     try {
       TransactionLogIterator transactionLogIterator =
@@ -415,6 +423,9 @@ public class RDBStore implements DBStore {
         }
         dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
             result.sequenceNumber());
+        if (currSequenceNumber - sequenceNumber >= limitCount) {
+          break;
+        }
         transactionLogIterator.next();
       }
     } catch (RocksDBException e) {
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
index 34d348f..f95a8ff 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
@@ -349,6 +349,30 @@ public class TestRDBStore {
   }
 
   @Test
+  public void testGetDBUpdatesSinceWithLimitCount() throws Exception {
+
+    try (RDBStore newStore =
+             new RDBStore(folder.newFolder(), options, configSet)) {
+
+      try (Table firstTable = newStore.getTable(families.get(1))) {
+        firstTable.put(
+            org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key1"),
+            org.apache.commons.codec.binary.StringUtils
+                .getBytesUtf16("Value1"));
+        firstTable.put(
+            org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key2"),
+            org.apache.commons.codec.binary.StringUtils
+                .getBytesUtf16("Value2"));
+      }
+      Assert.assertTrue(
+          newStore.getDb().getLatestSequenceNumber() == 2);
+
+      DBUpdatesWrapper dbUpdatesSince = newStore.getUpdatesSince(0, 1);
+      Assert.assertEquals(1, dbUpdatesSince.getData().size());
+    }
+  }
+
+  @Test
   public void testDowngrade() throws Exception {
 
     // Write data to current DB which has 6 column families at the time of
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 1dd922c..e69f08d 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1098,6 +1098,7 @@ message ServiceListRequest {
 
 message DBUpdatesRequest {
     required uint64 sequenceNumber = 1;
+    optional uint64 limitCount = 2;
 }
 
 message ServiceListResponse {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 1640808..900babd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3489,8 +3489,12 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   public DBUpdates getDBUpdates(
       DBUpdatesRequest dbUpdatesRequest)
       throws SequenceNumberNotFoundException {
+    long limitCount = Long.MAX_VALUE;
+    if (dbUpdatesRequest.hasLimitCount()) {
+      limitCount = dbUpdatesRequest.getLimitCount();
+    }
     DBUpdatesWrapper updatesSince = metadataManager.getStore()
-        .getUpdatesSince(dbUpdatesRequest.getSequenceNumber());
+        .getUpdatesSince(dbUpdatesRequest.getSequenceNumber(), limitCount);
     DBUpdates dbUpdates = new DBUpdates(updatesSince.getData());
     
dbUpdates.setCurrentSequenceNumber(updatesSince.getCurrentSequenceNumber());
     return dbUpdates;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to