This is an automated email from the ASF dual-hosted git repository.
ferhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cde7cb7 HDDS-6147. Add ability in OM to get limited delta updates
(#2956)
cde7cb7 is described below
commit cde7cb766199dcba53eef53b3c13c63771eb87ac
Author: Symious <[email protected]>
AuthorDate: Tue Jan 18 15:04:36 2022 +0800
HDDS-6147. Add ability in OM to get limited delta updates (#2956)
---
.../org/apache/hadoop/hdds/utils/db/DBStore.java | 10 +++++++++
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 11 ++++++++++
.../apache/hadoop/hdds/utils/db/TestRDBStore.java | 24 ++++++++++++++++++++++
.../src/main/proto/OmClientProtocol.proto | 1 +
.../org/apache/hadoop/ozone/om/OzoneManager.java | 6 +++++-
5 files changed, 51 insertions(+), 1 deletion(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index f0096ed..2ac2bdc 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -187,4 +187,14 @@ public interface DBStore extends AutoCloseable,
BatchOperationHandler {
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException;
+
+ /**
+ * Get limited data written to DB since a specific sequence number.
+ * @param sequenceNumber
+ * @param limitCount
+ * @return
+ * @throws SequenceNumberNotFoundException
+ */
+ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
+ throws SequenceNumberNotFoundException;
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index b50b462..eb71ec1 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -382,7 +382,15 @@ public class RDBStore implements DBStore {
@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException {
+ return getUpdatesSince(sequenceNumber, Long.MAX_VALUE);
+ }
+ @Override
+ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
+ throws SequenceNumberNotFoundException {
+ if (limitCount <= 0) {
+ throw new IllegalArgumentException("Illegal count for getUpdatesSince.");
+ }
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try {
TransactionLogIterator transactionLogIterator =
@@ -415,6 +423,9 @@ public class RDBStore implements DBStore {
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
+ if (currSequenceNumber - sequenceNumber >= limitCount) {
+ break;
+ }
transactionLogIterator.next();
}
} catch (RocksDBException e) {
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
index 34d348f..f95a8ff 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
@@ -349,6 +349,30 @@ public class TestRDBStore {
}
@Test
+ public void testGetDBUpdatesSinceWithLimitCount() throws Exception {
+
+ try (RDBStore newStore =
+ new RDBStore(folder.newFolder(), options, configSet)) {
+
+ try (Table firstTable = newStore.getTable(families.get(1))) {
+ firstTable.put(
+ org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key1"),
+ org.apache.commons.codec.binary.StringUtils
+ .getBytesUtf16("Value1"));
+ firstTable.put(
+ org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key2"),
+ org.apache.commons.codec.binary.StringUtils
+ .getBytesUtf16("Value2"));
+ }
+ Assert.assertTrue(
+ newStore.getDb().getLatestSequenceNumber() == 2);
+
+ DBUpdatesWrapper dbUpdatesSince = newStore.getUpdatesSince(0, 1);
+ Assert.assertEquals(1, dbUpdatesSince.getData().size());
+ }
+ }
+
+ @Test
public void testDowngrade() throws Exception {
// Write data to current DB which has 6 column families at the time of
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 1dd922c..e69f08d 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1098,6 +1098,7 @@ message ServiceListRequest {
message DBUpdatesRequest {
required uint64 sequenceNumber = 1;
+ optional uint64 limitCount = 2;
}
message ServiceListResponse {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 1640808..900babd 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3489,8 +3489,12 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
public DBUpdates getDBUpdates(
DBUpdatesRequest dbUpdatesRequest)
throws SequenceNumberNotFoundException {
+ long limitCount = Long.MAX_VALUE;
+ if (dbUpdatesRequest.hasLimitCount()) {
+ limitCount = dbUpdatesRequest.getLimitCount();
+ }
DBUpdatesWrapper updatesSince = metadataManager.getStore()
- .getUpdatesSince(dbUpdatesRequest.getSequenceNumber());
+ .getUpdatesSince(dbUpdatesRequest.getSequenceNumber(), limitCount);
DBUpdates dbUpdates = new DBUpdates(updatesSince.getData());
dbUpdates.setCurrentSequenceNumber(updatesSince.getCurrentSequenceNumber());
return dbUpdates;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]