[ 
https://issues.apache.org/jira/browse/HDDS-1649?focusedWorklogId=279379&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-279379
 ]

ASF GitHub Bot logged work on HDDS-1649:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jul/19 22:17
            Start Date: 18/Jul/19 22:17
    Worklog Time Spent: 10m 
      Work Description: hanishakoneru commented on pull request #948: 
HDDS-1649. On installSnapshot notification from OM leader, download checkpoint 
and reload OM state
URL: https://github.com/apache/hadoop/pull/948#discussion_r305137102
 
 

 ##########
 File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 ##########
 @@ -3150,6 +3169,195 @@ public boolean setAcl(OzoneObj obj, List<OzoneAcl> 
acls) throws IOException {
     }
   }
 
+  /**
+   * Download and install latest checkpoint from leader OM.
+   * If the download checkpoints snapshot index is greater than this OM's
+   * last applied transaction index, then re-initialize the OM state via this
+   * checkpoint. Before re-initializing OM state, the OM Ratis server should
+   * be stopped so that no new transactions can be applied.
+   * @param leaderId peerNodeID of the leader OM
+   * @return If checkpoint is installed, return the corresponding termIndex.
+   * Otherwise, return null.
+   */
+  public TermIndex installSnapshot(String leaderId) {
+    if (omSnapshotProvider == null) {
+      LOG.error("OM Snapshot Provider is not configured as there are no peer " 
+
+          "nodes.");
+      return null;
+    }
+
+    DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId);
+
+    // Check if current ratis log index is smaller than the downloaded
+    // snapshot index. If yes, proceed by stopping the ratis server so that
+    // the OM state can be re-initialized. If no, then do not proceed with
+    // installSnapshot.
+    long lastAppliedIndex = omRatisServer.getStateMachineLastAppliedIndex();
+    long checkpointSnapshotIndex = omDBcheckpoint.getRatisSnapshotIndex();
+    if (checkpointSnapshotIndex <= lastAppliedIndex) {
+      LOG.error("Failed to install checkpoint from OM leader: {}. The last " +
+          "applied index: {} is greater than or equal to the checkpoint's " +
+          "snapshot index: {}", leaderId, lastAppliedIndex,
+          checkpointSnapshotIndex);
+      return null;
+    }
+
+    // Pause the State Machine so that no new transactions can be applied.
+    // This action also clears the OM Double Buffer so that if there are any
+    // pending transactions in the buffer, they are discarded.
+    // TODO: The Ratis server should also be paused here. This is required
+    //  because a leader election might happen while the snapshot
+    //  installation is in progress and the new leader might start sending
+    //  append log entries to the ratis server.
+    omRatisServer.getOmStateMachine().pause();
+
+    try {
+      replaceOMDBWithCheckpoint(lastAppliedIndex, omDBcheckpoint);
+    } catch (Exception e) {
+      LOG.error("OM DB checkpoint replacement with new downloaded checkpoint " 
+
+          "failed.", e);
+      return null;
+    }
+
+    // Reload the OM DB store with the new checkpoint.
+    // Restart (unpause) the state machine and update its last applied index
+    // to the installed checkpoint's snapshot index.
+    try {
+      reloadOMState();
+      omRatisServer.getOmStateMachine().unpause(checkpointSnapshotIndex);
+    } catch (IOException e) {
+      LOG.error("Failed to reload OM state with new DB checkpoint.", e);
+      return null;
+    }
+
+    // TODO: We should only return the snpashotIndex to the leader.
+    //  Should be fixed after RATIS-586
+    TermIndex newTermIndex = TermIndex.newTermIndex(0,
+        checkpointSnapshotIndex);
+
+    return newTermIndex;
+  }
+
+  /**
+   * Download the latest OM DB checkpoint from the leader OM.
+   * @param leaderId OMNodeID of the leader OM node.
+   * @return latest DB checkpoint from leader OM.
+   */
+  private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
+    LOG.info("Downloading checkpoint from leader OM {} and reloading state " +
+        "from the checkpoint.", leaderId);
+
+    try {
+      return omSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
+    } catch (IOException e) {
+      LOG.error("Failed to download checkpoint from OM leader {}", leaderId, 
e);
+    }
+    return null;
+  }
+
+  /**
+   * Replace the current OM DB with the new DB checkpoint.
+   * @param lastAppliedIndex the last applied index in the current OM DB.
+   * @param omDBcheckpoint the new DB checkpoint
+   * @throws Exception
+   */
+  void replaceOMDBWithCheckpoint(long lastAppliedIndex,
+      DBCheckpoint omDBcheckpoint) throws Exception {
+    // Stop the DB first
+    DBStore store = metadataManager.getStore();
+    store.close();
+
+    // Take a backup of the current DB
+    File db = store.getDbLocation();
+    String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX +
+        lastAppliedIndex + "_" + System.currentTimeMillis();
+    File dbBackup = new File(db.getParentFile(), dbBackupName);
+
+    try {
+      Files.move(db.toPath(), dbBackup.toPath());
+    } catch (IOException e) {
+      LOG.error("Failed to create a backup of the current DB. Aborting " +
+          "snapshot installation.");
+      throw e;
+    }
+
+    // Move the new DB checkpoint into the om metadata dir
+    Path checkpointPath = omDBcheckpoint.getCheckpointLocation();
+    try {
+      Files.move(checkpointPath, db.toPath());
+    } catch (IOException e) {
+      LOG.error("Failed to move downloaded DB checkpoint {} to metadata " +
+          "directory {}",checkpointPath, db.toPath());
+      throw e;
 
 Review comment:
   Done
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 279379)
    Time Spent: 10h 40m  (was: 10.5h)

> On installSnapshot notification from OM leader, download checkpoint and 
> reload OM state
> ---------------------------------------------------------------------------------------
>
>                 Key: HDDS-1649
>                 URL: https://issues.apache.org/jira/browse/HDDS-1649
>             Project: Hadoop Distributed Data Store
>          Issue Type: Sub-task
>            Reporter: Hanisha Koneru
>            Assignee: Hanisha Koneru
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> Installing a DB checkpoint on the OM involves following steps:
>  1. When an OM follower receives installSnapshot notification from OM leader, 
> it should initiate a new checkpoint on the OM leader and download that 
> checkpoint through Http. 
>  2. After downloading the checkpoint, the StateMachine must be paused so that 
> the old OM DB can be replaced with the new downloaded checkpoint. 
>  3. The OM should be reloaded with the new state . All the services having a 
> dependency on the OM DB (such as MetadataManager, KeyManager etc.) must be 
> re-initialized/ restarted. 
>  4. Once the OM is ready with the new state, the state machine must be 
> unpaused to resume participating in the Ratis ring.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to