This is an automated email from the ASF dual-hosted git repository.

jacksonyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 11f4c9b  HDDS-6215. Recon get limited delta updates from OM (#3009)
11f4c9b is described below

commit 11f4c9b4830bd73652a4a72fe9b73cb824f20163
Author: Symious <[email protected]>
AuthorDate: Wed Feb 9 09:36:44 2022 +0800

    HDDS-6215. Recon get limited delta updates from OM (#3009)
    
    * HDDS-6125. Recon get limited delta updates from OM
    
    * HDDS-6215. Fix unit test
    
    * trigger new CI check
    
    * HDDS-6215. Fix typo
    
    * trigger new CI check
    
    Co-authored-by: Symious <[email protected]>
---
 .../common/src/main/resources/ozone-default.xml    | 18 +++++
 .../hadoop/ozone/recon/ReconServerConfigKeys.java  |  7 ++
 .../spi/impl/OzoneManagerServiceProviderImpl.java  | 49 +++++++++++-
 .../impl/TestOzoneManagerServiceProviderImpl.java  | 86 +++++++++++++++++++++-
 4 files changed, 158 insertions(+), 2 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2f9d01d..92ebc6b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2570,6 +2570,24 @@
     </description>
   </property>
   <property>
+    <name>recon.om.delta.update.limit</name>
+    <value>2000</value>
+    <tag>OZONE, RECON</tag>
+    <description>
+      Recon each time get a limited delta updates from OM.
+      The actual fetched data might be larger than this limit.
+    </description>
+  </property>
+  <property>
+    <name>recon.om.delta.update.loop.limit</name>
+    <value>10</value>
+    <tag>OZONE, RECON</tag>
+    <description>
+      The sync between Recon and OM consists of several small
+      fetch loops.
+    </description>
+  </property>
+  <property>
     <name>ozone.recon.scm.container.threshold</name>
     <value>100</value>
     <tag>OZONE, RECON, SCM</tag>
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index dd9f0c9..d23ffe9 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -96,6 +96,13 @@ public final class  ReconServerConfigKeys {
   public static final String RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM =
       "recon.om.snapshot.task.flush.param";
 
+  public static final String RECON_OM_DELTA_UPDATE_LIMIT =
+      "recon.om.delta.update.limit";
+  public static final long RECON_OM_DELTA_UPDATE_LIMIT_DEFUALT = 2000;
+  public static final String RECON_OM_DELTA_UPDATE_LOOP_LIMIT =
+      "recon.om.delta.update.loop.limit";
+  public static final int RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFUALT = 10;
+
   public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY =
       "ozone.recon.task.thread.count";
   public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 5;
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index 021d014..288b26f 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -71,6 +71,10 @@ import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT_DEFUALT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFUALT;
 import static org.apache.ratis.proto.RaftProtos.RaftPeerRole.LEADER;
 import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
 import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
@@ -105,6 +109,9 @@ public class OzoneManagerServiceProviderImpl
   private ReconUtils reconUtils;
   private OzoneManagerSyncMetrics metrics;
 
+  private long deltaUpdateLimit;
+  private int deltaUpdateLoopLimit;
+
   /**
    * OM Snapshot related task names.
    */
@@ -145,6 +152,12 @@ public class OzoneManagerServiceProviderImpl
     String ozoneManagerHttpsAddress = configuration.get(OMConfigKeys
         .OZONE_OM_HTTPS_ADDRESS_KEY);
 
+    long deltaUpdateLimits = configuration.getLong(RECON_OM_DELTA_UPDATE_LIMIT,
+        RECON_OM_DELTA_UPDATE_LIMIT_DEFUALT);
+    int deltaUpdateLoopLimits = configuration.getInt(
+        RECON_OM_DELTA_UPDATE_LOOP_LIMIT,
+        RECON_OM_DELTA_UPDATE_LOOP_LIMIT_DEFUALT);
+
     omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration,
         OZONE_RECON_OM_SNAPSHOT_DB_DIR);
 
@@ -176,6 +189,8 @@ public class OzoneManagerServiceProviderImpl
     this.ozoneManagerClient = ozoneManagerClient;
     this.configuration = configuration;
     this.metrics = OzoneManagerSyncMetrics.create();
+    this.deltaUpdateLimit = deltaUpdateLimits;
+    this.deltaUpdateLoopLimit = deltaUpdateLoopLimits;
   }
 
   public void registerOMDBTasks() {
@@ -356,8 +371,38 @@ public class OzoneManagerServiceProviderImpl
   void getAndApplyDeltaUpdatesFromOM(
       long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
       throws IOException, RocksDBException {
+    int loopCount = 0;
+    long originalFromSequenceNumber = fromSequenceNumber;
+    long resultCount = Long.MAX_VALUE;
+    while (loopCount < deltaUpdateLoopLimit &&
+        resultCount >= deltaUpdateLimit) {
+      resultCount = innerGetAndApplyDeltaUpdatesFromOM(fromSequenceNumber,
+          omdbUpdatesHandler);
+      fromSequenceNumber += resultCount;
+      loopCount++;
+    }
+    LOG.info("Delta updates received from OM : {} loops, {} records", 
loopCount,
+        fromSequenceNumber - originalFromSequenceNumber
+    );
+  }
+
+  /**
+   * Get Delta updates from OM through RPC call and apply to local OM DB as
+   * well as accumulate in a buffer.
+   * @param fromSequenceNumber from sequence number to request from.
+   * @param omdbUpdatesHandler OM DB updates handler to buffer updates.
+   * @throws IOException when OM RPC request fails.
+   * @throws RocksDBException when writing to RocksDB fails.
+   */
+  @VisibleForTesting
+  long innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
+      OMDBUpdatesHandler omdbUpdatesHandler)
+      throws IOException, RocksDBException {
+    int recordCount = 0;
     DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
-        .setSequenceNumber(fromSequenceNumber).build();
+        .setSequenceNumber(fromSequenceNumber)
+        .setLimitCount(deltaUpdateLimit)
+        .build();
     DBUpdates dbUpdates = ozoneManagerClient.getDBUpdates(dbUpdatesRequest);
     if (null != dbUpdates) {
       RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore();
@@ -366,6 +411,7 @@ public class OzoneManagerServiceProviderImpl
       LOG.info("Number of updates received from OM : {}", numUpdates);
       if (numUpdates > 0) {
         metrics.incrNumUpdatesInDeltaTotal(numUpdates);
+        recordCount = numUpdates;
       }
       for (byte[] data : dbUpdates.getData()) {
         try (WriteBatch writeBatch = new WriteBatch(data)) {
@@ -379,6 +425,7 @@ public class OzoneManagerServiceProviderImpl
         }
       }
     }
+    return recordCount;
   }
 
   /**
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
index c8d2544..161c035 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
@@ -25,12 +25,15 @@ import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializ
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LIMIT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_DELTA_UPDATE_LOOP_LIMIT;
 import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile;
 import static 
org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmDeltaRequest;
 import static 
org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl.OmSnapshotTaskName.OmSnapshotRequest;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -245,7 +248,7 @@ public class TestOzoneManagerServiceProviderImpl {
         metrics.getAverageNumUpdatesInDeltaRequest().value(), 0.0);
     assertEquals(1, metrics.getNumNonZeroDeltaRequests().value());
 
-    // In this method, we have to assert the "GET" part and the "APPLY" path.
+    // In this method, we have to assert the "GET" path and the "APPLY" path.
 
     // Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
     // events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
@@ -264,6 +267,76 @@ public class TestOzoneManagerServiceProviderImpl {
   }
 
   @Test
+  public void testGetAndApplyDeltaUpdatesFromOMWithLimit() throws Exception {
+
+    // Writing 2 Keys into a source OM DB and collecting it in a
+    // DBUpdatesWrapper.
+    OMMetadataManager sourceOMMetadataMgr =
+        initializeNewOmMetadataManager(temporaryFolder.newFolder());
+    writeDataToOm(sourceOMMetadataMgr, "key_one");
+    writeDataToOm(sourceOMMetadataMgr, "key_two");
+
+    RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb();
+    TransactionLogIterator transactionLogIterator = 
rocksDB.getUpdatesSince(0L);
+    DBUpdates[] dbUpdatesWrapper = new DBUpdates[4];
+    int index = 0;
+    while(transactionLogIterator.isValid()) {
+      TransactionLogIterator.BatchResult result =
+          transactionLogIterator.getBatch();
+      result.writeBatch().markWalTerminationPoint();
+      WriteBatch writeBatch = result.writeBatch();
+      dbUpdatesWrapper[index] = new DBUpdates();
+      dbUpdatesWrapper[index].addWriteBatch(writeBatch.data(),
+          result.sequenceNumber());
+      index++;
+      transactionLogIterator.next();
+    }
+
+    // OM Service Provider's Metadata Manager.
+    OMMetadataManager omMetadataManager =
+        initializeNewOmMetadataManager(temporaryFolder.newFolder());
+
+    OzoneConfiguration withLimitConfiguration =
+        new OzoneConfiguration(configuration);
+    withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LIMIT, 1);
+    withLimitConfiguration.setLong(RECON_OM_DELTA_UPDATE_LOOP_LIMIT, 3);
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        new OzoneManagerServiceProviderImpl(withLimitConfiguration,
+            getTestReconOmMetadataManager(omMetadataManager,
+                temporaryFolder.newFolder()),
+            getMockTaskController(), new ReconUtils(),
+            getMockOzoneManagerClientWith4Updates(dbUpdatesWrapper[0],
+                dbUpdatesWrapper[1], dbUpdatesWrapper[2], 
dbUpdatesWrapper[3]));
+
+    OMDBUpdatesHandler updatesHandler =
+        new OMDBUpdatesHandler(omMetadataManager);
+    ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
+        0L, updatesHandler);
+
+    OzoneManagerSyncMetrics metrics = ozoneManagerServiceProvider.getMetrics();
+    assertEquals(1.0,
+        metrics.getAverageNumUpdatesInDeltaRequest().value(), 0.0);
+    assertEquals(3, metrics.getNumNonZeroDeltaRequests().value());
+
+    // In this method, we have to assert the "GET" path and the "APPLY" path.
+
+    // Assert GET path --> verify if the OMDBUpdatesHandler picked up the first
+    // 3 of 4 events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
+    assertEquals(3, updatesHandler.getEvents().size());
+
+    // Assert APPLY path --> Verify if the OM service provider's RocksDB got
+    // the first 3 changes, last change not applied.
+    String fullKey = omMetadataManager.getOzoneKey("sampleVol",
+        "bucketOne", "key_one");
+    assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+        .getKeyTable(getBucketLayout()).isExist(fullKey));
+    fullKey = omMetadataManager.getOzoneKey("sampleVol",
+        "bucketOne", "key_two");
+    assertFalse(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+        .getKeyTable(getBucketLayout()).isExist(fullKey));
+  }
+
+  @Test
   public void testSyncDataFromOMFullSnapshot() throws Exception {
 
     // Empty OM DB to start with.
@@ -364,6 +437,17 @@ public class TestOzoneManagerServiceProviderImpl {
     return ozoneManagerProtocolMock;
   }
 
+  private OzoneManagerProtocol getMockOzoneManagerClientWith4Updates(
+      DBUpdates updates1, DBUpdates updates2, DBUpdates updates3,
+      DBUpdates updates4) throws IOException {
+    OzoneManagerProtocol ozoneManagerProtocolMock =
+        mock(OzoneManagerProtocol.class);
+    when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
+        .DBUpdatesRequest.class))).thenReturn(updates1, updates2, updates3,
+        updates4);
+    return ozoneManagerProtocolMock;
+  }
+
   private BucketLayout getBucketLayout() {
     return BucketLayout.DEFAULT;
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to