This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e0d4007 HDDS-6322. Fix Recon getting inccorrect sequenceNumber from
OM (#3090)
e0d4007 is described below
commit e0d4007bb1e94052bcca85ededd388f5c29a2dfa
Author: Symious <[email protected]>
AuthorDate: Tue Feb 15 23:17:05 2022 +0800
HDDS-6322. Fix Recon getting inccorrect sequenceNumber from OM (#3090)
---
.../ozone/recon/TestReconWithOzoneManager.java | 5 ++++
.../spi/impl/OzoneManagerServiceProviderImpl.java | 29 ++++++++++++----------
2 files changed, 21 insertions(+), 13 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java
index 9e21311..ed903b3 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java
@@ -24,6 +24,8 @@ import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM
import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_CONNECTION_TIMEOUT_DEFAULT;
import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT;
import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SOCKET_TIMEOUT_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
import static
org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
import static
org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
@@ -109,6 +111,9 @@ public class TestReconWithOzoneManager {
OZONE_RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT),
TimeUnit.MILLISECONDS
);
+ conf.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 2);
+ conf.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 10);
+
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(socketTimeout)
.setConnectionRequestTimeout(connectionTimeout)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index 08612e3f..ff73003 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -372,17 +372,21 @@ public class OzoneManagerServiceProviderImpl
long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
int loopCount = 0;
- long originalFromSequenceNumber = fromSequenceNumber;
- long resultCount = Long.MAX_VALUE;
+ LOG.info("OriginalFromSequenceNumber : {} ", fromSequenceNumber);
+ long deltaUpdateCnt = Long.MAX_VALUE;
+ long inLoopStartSequenceNumber = fromSequenceNumber;
+ long inLoopLatestSequenceNumber;
while (loopCount < deltaUpdateLoopLimit &&
- resultCount >= deltaUpdateLimit) {
- resultCount = innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber,
+ deltaUpdateCnt >= deltaUpdateLimit) {
+ innerGetAndApplyDeltaUpdatesFromOM(inLoopStartSequenceNumber,
omdbUpdatesHandler);
- fromSequenceNumber += resultCount;
+ inLoopLatestSequenceNumber = getCurrentOMDBSequenceNumber();
+ deltaUpdateCnt = inLoopLatestSequenceNumber - inLoopStartSequenceNumber;
+ inLoopStartSequenceNumber = inLoopLatestSequenceNumber;
loopCount++;
}
LOG.info("Delta updates received from OM : {} loops, {} records",
loopCount,
- fromSequenceNumber - originalFromSequenceNumber
+ getCurrentOMDBSequenceNumber() - fromSequenceNumber
);
}
@@ -395,23 +399,21 @@ public class OzoneManagerServiceProviderImpl
* @throws RocksDBException when writing to RocksDB fails.
*/
@VisibleForTesting
- long innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
+ void innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
- int recordCount = 0;
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
.setSequenceNumber(fromSequenceNumber)
.setLimitCount(deltaUpdateLimit)
.build();
DBUpdates dbUpdates = ozoneManagerClient.getDBUpdates(dbUpdatesRequest);
- if (null != dbUpdates) {
+ int numUpdates = 0;
+ if (null != dbUpdates && dbUpdates.getCurrentSequenceNumber() != -1) {
RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore();
RocksDB rocksDB = rocksDBStore.getDb();
- int numUpdates = dbUpdates.getData().size();
- LOG.info("Number of updates received from OM : {}", numUpdates);
+ numUpdates = dbUpdates.getData().size();
if (numUpdates > 0) {
metrics.incrNumUpdatesInDeltaTotal(numUpdates);
- recordCount = numUpdates;
}
for (byte[] data : dbUpdates.getData()) {
try (WriteBatch writeBatch = new WriteBatch(data)) {
@@ -425,7 +427,8 @@ public class OzoneManagerServiceProviderImpl
}
}
}
- return recordCount;
+ LOG.info("Number of updates received from OM : {}, SequenceNumber diff:
{}",
+ numUpdates, getCurrentOMDBSequenceNumber() - fromSequenceNumber);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]