This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a826133d6 HDDS-8195. RDBStore.getUpdatesSince() throws 
RocksDBException: Requested array size exceeds VM limit (#4459)
2a826133d6 is described below

commit 2a826133d681e3ed8a789c77b6c6f8c622e4c743
Author: devmadhuu <[email protected]>
AuthorDate: Wed Mar 29 03:22:47 2023 +0530

    HDDS-8195. RDBStore.getUpdatesSince() throws RocksDBException: Requested 
array size exceeds VM limit (#4459)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  5 ++++
 .../common/src/main/resources/ozone-default.xml    | 12 ++++++++++
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       | 11 ++++++++-
 .../hadoop/hdds/utils/db/DBUpdatesWrapper.java     |  9 +++++++
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  | 21 +++++++++++++---
 .../apache/hadoop/hdds/utils/db/TestRDBStore.java  | 28 +++++++++++++++++-----
 .../hadoop/hdds/utils/db/TestRDBTableStore.java    |  4 +++-
 .../hdds/utils/db/TestTypedRDBTableStore.java      |  4 +++-
 .../apache/hadoop/ozone/om/helpers/DBUpdates.java  | 10 ++++++++
 .../src/main/proto/OmClientProtocol.proto          |  1 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  1 +
 .../protocolPB/OzoneManagerRequestHandler.java     |  1 +
 .../spi/impl/OzoneManagerServiceProviderImpl.java  | 14 ++++++++---
 .../impl/TestOzoneManagerServiceProviderImpl.java  |  5 ++++
 14 files changed, 111 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index c6be74a9cb..b0560daa9a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -616,6 +616,11 @@ public final class OzoneConfigKeys {
 
   public static final boolean OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT = 
false;
 
+  public static final String OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT =
+      "ozone.om.delta.update.data.size.max.limit";
+  public static final String
+      OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT_DEFAULT = "1024MB";
+
   /**
    * There is no need to instantiate this class.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index ec98bc3e16..a5b8a905da 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2851,6 +2851,18 @@
       The actual fetched data might be larger than this limit.
     </description>
   </property>
+  <property>
+    <name>ozone.om.delta.update.data.size.max.limit</name>
+    <value>1024MB</value>
+    <tag>OM, MANAGEMENT</tag>
+    <description>
+      Recon get a limited delta updates from OM periodically since sequence 
number.
+      Based on sequence number passed, OM DB delta update may have large 
number of
+      log files and each log batch data may be huge depending on frequent 
writes and
+      updates by ozone client, so to avoid increase in heap memory, this 
config is used
+      as limiting factor of default 1 GB while preparing DB updates object.
+    </description>
+  </property>
   <property>
     <name>recon.om.delta.update.loop.limit</name>
     <value>10</value>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index f30772466e..5cd20ab237 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -45,6 +45,8 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKS
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
@@ -105,6 +107,9 @@ public final class DBStoreBuilder {
   private boolean enableCompactionLog;
   private long maxTimeAllowedForSnapshotInDag;
   private long pruneCompactionDagDaemonRunInterval;
+  // this is to track the total size of dbUpdates data since sequence
+  // number in request to avoid increase in heap memory.
+  private long maxDbUpdatesSizeThreshold;
 
   /**
    * Create DBStoreBuilder from a generic DBDefinition.
@@ -162,6 +167,10 @@ public final class DBStoreBuilder {
         OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL,
         OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
+
+    this.maxDbUpdatesSizeThreshold = (long) configuration.getStorageSize(
+        OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT,
+        OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT_DEFAULT, StorageUnit.BYTES);
   }
 
   private void applyDBDefinition(DBDefinition definition) {
@@ -219,7 +228,7 @@ public final class DBStoreBuilder {
       return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
           registry, openReadOnly, maxFSSnapshots, dbJmxBeanNameName,
           enableCompactionLog, maxTimeAllowedForSnapshotInDag,
-          pruneCompactionDagDaemonRunInterval);
+          pruneCompactionDagDaemonRunInterval, maxDbUpdatesSizeThreshold);
     } finally {
       tableConfigs.forEach(TableConfig::close);
     }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBUpdatesWrapper.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBUpdatesWrapper.java
index d2dcc0531b..1ba149f403 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBUpdatesWrapper.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBUpdatesWrapper.java
@@ -30,6 +30,7 @@ public class DBUpdatesWrapper {
   private List<byte[]> dataList = new ArrayList<>();
   private long currentSequenceNumber = -1;
   private long latestSequenceNumber = -1;
+  private boolean isDBUpdateSuccess = true;
 
   public void addWriteBatch(byte[] data, long sequenceNumber) {
     dataList.add(data);
@@ -57,5 +58,13 @@ public class DBUpdatesWrapper {
   public long getLatestSequenceNumber() {
     return latestSequenceNumber;
   }
+
+  public boolean isDBUpdateSuccess() {
+    return isDBUpdateSuccess;
+  }
+
+  public void setDBUpdateSuccess(boolean dbUpdateSuccess) {
+    this.isDBUpdateSuccess = dbUpdateSuccess;
+  }
 }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 74fe859dd0..7a0cf350f3 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -71,12 +71,18 @@ public class RDBStore implements DBStore {
   private final RocksDBCheckpointDiffer rocksDBCheckpointDiffer;
   private final String dbJmxBeanName;
 
+  // this is to track the total size of dbUpdates data since sequence
+  // number in request to avoid increase in heap memory.
+  private long maxDbUpdatesSizeThreshold;
+
   @VisibleForTesting
   public RDBStore(File dbFile, ManagedDBOptions options,
-                  Set<TableConfig> families) throws IOException {
+                  Set<TableConfig> families, long maxDbUpdatesSizeThreshold)
+      throws IOException {
     this(dbFile, options, new ManagedWriteOptions(), families,
         new CodecRegistry(), false, 1000, null, false,
-        TimeUnit.DAYS.toMillis(1), TimeUnit.HOURS.toMillis(1));
+        TimeUnit.DAYS.toMillis(1), TimeUnit.HOURS.toMillis(1),
+        maxDbUpdatesSizeThreshold);
   }
 
   @SuppressWarnings("parameternumber")
@@ -85,11 +91,13 @@ public class RDBStore implements DBStore {
                   CodecRegistry registry, boolean readOnly, int maxFSSnapshots,
                   String dbJmxBeanNameName, boolean enableCompactionLog,
                   long maxTimeAllowedForSnapshotInDag,
-                  long compactionDagDaemonInterval)
+                  long compactionDagDaemonInterval,
+                  long maxDbUpdatesSizeThreshold)
       throws IOException {
     Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
     Preconditions.checkNotNull(families);
     Preconditions.checkArgument(!families.isEmpty());
+    this.maxDbUpdatesSizeThreshold = maxDbUpdatesSizeThreshold;
     codecRegistry = registry;
     dbLocation = dbFile;
     dbJmxBeanName = dbJmxBeanNameName == null ? dbFile.getName() :
@@ -362,6 +370,7 @@ public class RDBStore implements DBStore {
     if (limitCount <= 0) {
       throw new IllegalArgumentException("Illegal count for getUpdatesSince.");
     }
+    long cumulativeDBUpdateLogBatchSize = 0L;
     DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
     try (ManagedTransactionLogIterator logIterator =
         db.getUpdatesSince(sequenceNumber)) {
@@ -406,6 +415,10 @@ public class RDBStore implements DBStore {
           if (currSequenceNumber - sequenceNumber >= limitCount) {
             break;
           }
+          cumulativeDBUpdateLogBatchSize += result.writeBatch().getDataSize();
+          if (cumulativeDBUpdateLogBatchSize >= maxDbUpdatesSizeThreshold) {
+            break;
+          }
         } finally {
           result.writeBatch().close();
         }
@@ -415,6 +428,7 @@ public class RDBStore implements DBStore {
       LOG.warn("Unable to get delta updates since sequenceNumber {}. "
               + "This exception will be thrown to the client",
           sequenceNumber, e);
+      dbUpdatesWrapper.setDBUpdateSuccess(false);
       // Throw the exception back to Recon. Expect Recon to fall back to
       // full snapshot.
       throw e;
@@ -422,6 +436,7 @@ public class RDBStore implements DBStore {
       LOG.error("Unable to get delta updates since sequenceNumber {}. "
               + "This exception will not be thrown to the client ",
           sequenceNumber, e);
+      dbUpdatesWrapper.setDBUpdateSuccess(false);
     }
     dbUpdatesWrapper.setLatestSequenceNumber(db.getLatestSequenceNumber());
     return dbUpdatesWrapper;
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
index 21b1d53390..7393bd7890 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
@@ -50,6 +50,7 @@ import org.rocksdb.StatsLevel;
  * RDBStore Tests.
  */
 public class TestRDBStore {
+  public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
   private final List<String> families =
       Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
           "First", "Second", "Third",
@@ -74,7 +75,8 @@ public class TestRDBStore {
           new ManagedColumnFamilyOptions());
       configSet.add(newConfig);
     }
-    rdbStore = new RDBStore(tempDir, options, configSet);
+    rdbStore = new RDBStore(tempDir, options, configSet,
+        MAX_DB_UPDATES_SIZE_THRESHOLD);
   }
 
   @AfterEach
@@ -262,7 +264,7 @@ public class TestRDBStore {
 
     RDBStore restoredStoreFromCheckPoint =
         new RDBStore(checkpoint.getCheckpointLocation().toFile(),
-            options, configSet);
+            options, configSet, MAX_DB_UPDATES_SIZE_THRESHOLD);
 
     // Let us make sure that our estimate is not off by 10%
     Assertions.assertTrue(
@@ -318,11 +320,24 @@ public class TestRDBStore {
           org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key2"),
           org.apache.commons.codec.binary.StringUtils
               .getBytesUtf16("Value2"));
+      firstTable.put(
+          org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key3"),
+          org.apache.commons.codec.binary.StringUtils
+              .getBytesUtf16("Value3"));
+      firstTable.put(
+          org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key4"),
+          org.apache.commons.codec.binary.StringUtils
+              .getBytesUtf16("Value4"));
+      firstTable.put(
+          org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key5"),
+          org.apache.commons.codec.binary.StringUtils
+              .getBytesUtf16("Value5"));
     }
-    Assertions.assertEquals(2, rdbStore.getDb().getLatestSequenceNumber());
+    Assertions.assertEquals(5, rdbStore.getDb().getLatestSequenceNumber());
 
-    DBUpdatesWrapper dbUpdatesSince = rdbStore.getUpdatesSince(0, 1);
-    Assertions.assertEquals(1, dbUpdatesSince.getData().size());
+    DBUpdatesWrapper dbUpdatesSince = rdbStore.getUpdatesSince(0, 5);
+    Assertions.assertEquals(2, dbUpdatesSince.getData().size());
+    Assertions.assertEquals(2, dbUpdatesSince.getCurrentSequenceNumber());
   }
 
   @Test
@@ -352,7 +367,8 @@ public class TestRDBStore {
           new ManagedColumnFamilyOptions());
       configSet.add(newConfig);
     }
-    rdbStore = new RDBStore(rdbStore.getDbLocation(), options, configSet);
+    rdbStore = new RDBStore(rdbStore.getDbLocation(), options, configSet,
+        MAX_DB_UPDATES_SIZE_THRESHOLD);
     for (String family : familiesMinusOne) {
       try (Table table = rdbStore.getTable(family)) {
         Assertions.assertNotNull(table, family + "is null");
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
index c53c34fdb2..486d16ff52 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java
@@ -51,6 +51,7 @@ import org.rocksdb.StatsLevel;
  * Tests for RocksDBTable Store.
  */
 public class TestRDBTableStore {
+  public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
   private static int count = 0;
   private final List<String> families =
       Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
@@ -113,7 +114,8 @@ public class TestRDBTableStore {
       TableConfig newConfig = new TableConfig(name, cfOptions);
       configSet.add(newConfig);
     }
-    rdbStore = new RDBStore(tempDir, options, configSet);
+    rdbStore = new RDBStore(tempDir, options, configSet,
+        MAX_DB_UPDATES_SIZE_THRESHOLD);
   }
 
   @AfterEach
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
index dda38d413d..c6e07a7a99 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
@@ -51,6 +51,7 @@ import org.rocksdb.StatsLevel;
  * Tests for RocksDBTable Store.
  */
 public class TestTypedRDBTableStore {
+  public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
   private static int count = 0;
   private final List<String> families =
       Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
@@ -78,7 +79,8 @@ public class TestTypedRDBTableStore {
           new ManagedColumnFamilyOptions());
       configSet.add(newConfig);
     }
-    rdbStore = new RDBStore(tempDir, options, configSet);
+    rdbStore = new RDBStore(tempDir, options, configSet,
+        MAX_DB_UPDATES_SIZE_THRESHOLD);
 
     codecRegistry = new CodecRegistry();
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/DBUpdates.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/DBUpdates.java
index 71916db6c7..4d9ae34501 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/DBUpdates.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/DBUpdates.java
@@ -31,6 +31,8 @@ public class DBUpdates {
 
   private long latestSequenceNumber = -1L;
 
+  private boolean isDBUpdateSuccess = true;
+
   public DBUpdates() {
     this.dataList = new ArrayList<>();
   }
@@ -65,4 +67,12 @@ public class DBUpdates {
   public long getLatestSequenceNumber() {
     return latestSequenceNumber;
   }
+
+  public boolean isDBUpdateSuccess() {
+    return isDBUpdateSuccess;
+  }
+
+  public void setDBUpdateSuccess(boolean dbUpdateSuccess) {
+    this.isDBUpdateSuccess = dbUpdateSuccess;
+  }
 }
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index faa430ca1d..de5f026960 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1339,6 +1339,7 @@ message DBUpdatesResponse {
     required uint64 sequenceNumber = 1;
     repeated bytes data = 2;
     optional uint64 latestSequenceNumber = 3;
+    optional bool dbUpdateSuccess = 4;
 }
 
 message RangerBGSyncRequest {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 15e13eb2ea..c2e96d24c4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3981,6 +3981,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     DBUpdates dbUpdates = new DBUpdates(updatesSince.getData());
     
dbUpdates.setCurrentSequenceNumber(updatesSince.getCurrentSequenceNumber());
     dbUpdates.setLatestSequenceNumber(updatesSince.getLatestSequenceNumber());
+    dbUpdates.setDBUpdateSuccess(updatesSince.isDBUpdateSuccess());
     return dbUpdates;
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 03c31d0241..44ec2be5f9 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -358,6 +358,7 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
     }
     builder.setSequenceNumber(dbUpdatesWrapper.getCurrentSequenceNumber());
     
builder.setLatestSequenceNumber(dbUpdatesWrapper.getLatestSequenceNumber());
+    builder.setDbUpdateSuccess(dbUpdatesWrapper.isDBUpdateSuccess());
     return builder.build();
   }
 
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index a3897d9a55..32370d0e84 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -412,8 +412,15 @@ public class OzoneManagerServiceProviderImpl
     long inLoopLatestSequenceNumber;
     while (loopCount < deltaUpdateLoopLimit &&
         deltaUpdateCnt >= deltaUpdateLimit) {
-      innerGetAndApplyDeltaUpdatesFromOM(
-          inLoopStartSequenceNumber, omdbUpdatesHandler);
+      if (!innerGetAndApplyDeltaUpdatesFromOM(
+          inLoopStartSequenceNumber, omdbUpdatesHandler)) {
+        LOG.error(
+            "Retrieve OM DB delta update failed for sequence number : {}, " +
+                "so falling back to full snapshot.", 
inLoopStartSequenceNumber);
+        throw new RocksDBException(
+            "Unable to get delta updates since sequenceNumber - " +
+                inLoopStartSequenceNumber);
+      }
       inLoopLatestSequenceNumber = getCurrentOMDBSequenceNumber();
       deltaUpdateCnt = inLoopLatestSequenceNumber - inLoopStartSequenceNumber;
       inLoopStartSequenceNumber = inLoopLatestSequenceNumber;
@@ -433,7 +440,7 @@ public class OzoneManagerServiceProviderImpl
    * @throws RocksDBException when writing to RocksDB fails.
    */
   @VisibleForTesting
-  void innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
+  boolean innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
       OMDBUpdatesHandler omdbUpdatesHandler)
       throws IOException, RocksDBException {
     DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
@@ -469,6 +476,7 @@ public class OzoneManagerServiceProviderImpl
     LOG.info("Number of updates received from OM : {}, " +
             "SequenceNumber diff: {}, SequenceNumber Lag from OM {}.",
         numUpdates, getCurrentOMDBSequenceNumber() - fromSequenceNumber, lag);
+    return null != dbUpdates && dbUpdates.isDBUpdateSuccess();
   }
 
   /**
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
index 8f586252c2..25c7d9c407 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
@@ -315,6 +315,11 @@ public class TestOzoneManagerServiceProviderImpl {
             getMockOzoneManagerClientWith4Updates(dbUpdatesWrapper[0],
                 dbUpdatesWrapper[1], dbUpdatesWrapper[2], 
dbUpdatesWrapper[3]));
 
+    assertEquals(true, dbUpdatesWrapper[0].isDBUpdateSuccess());
+    assertEquals(true, dbUpdatesWrapper[1].isDBUpdateSuccess());
+    assertEquals(true, dbUpdatesWrapper[2].isDBUpdateSuccess());
+    assertEquals(true, dbUpdatesWrapper[3].isDBUpdateSuccess());
+
     OMDBUpdatesHandler updatesHandler =
         new OMDBUpdatesHandler(omMetadataManager);
     ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to