linyiqun commented on a change in pull request #1613:
URL: https://github.com/apache/ozone/pull/1613#discussion_r532159436



##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.upgrade;
+
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * OM Request used to flush all transactions to disk, take a DB snapshot, and
+ * purge the logs, leaving Ratis in a clean state without unapplied log
+ * entries. This prepares the OM for upgrades/downgrades so that no request
+ * in the log is applied to the database in the old version of the code in one
+ * OM, and the new version of the code on another OM.
+ */
+public class OMPrepareRequest extends OMClientRequest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMPrepareRequest.class);
+
+  // Allow double buffer this many seconds to flush all transactions before
+  // returning an error to the caller.
+  private static final Duration DOUBLE_BUFFER_FLUSH_TIMEOUT =
+      Duration.of(5, ChronoUnit.MINUTES);
+  // Time between checks to see if double buffer finished flushing.
+  private static final Duration DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL =
+      Duration.of(1, ChronoUnit.SECONDS);
+
+  public OMPrepareRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(
+      OzoneManager ozoneManager, long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+    LOG.info("Received prepare request with log index {}", 
transactionLogIndex);
+
+    OMResponse.Builder responseBuilder =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    responseBuilder.setCmdType(Type.Prepare);
+    OMClientResponse response = null;
+
+    try {
+      // Create response.
+      PrepareResponse omResponse = PrepareResponse.newBuilder()
+              .setTxnID(transactionLogIndex)
+              .build();
+      responseBuilder.setPrepareResponse(omResponse);
+      response = new OMPrepareResponse(responseBuilder.build());
+
+      // Add response to double buffer before clearing logs.
+      // This guarantees the log index of this request will be the same as
+      // the snapshot index in the prepared state.
+      ozoneManagerDoubleBufferHelper.add(response, transactionLogIndex);
+
+      // Wait for outstanding double buffer entries to flush to disk,
+      // so they will not be purged from the log before being persisted to
+      // the DB.
+      // Since the response for this request was added to the double buffer
+      // already, once this index reaches the state machine, we know all
+      // transactions have been flushed.
+      waitForDoubleBufferFlush(ozoneManager, transactionLogIndex);
+
+      OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
+      RaftServerProxy server = (RaftServerProxy) omRatisServer.getServer();
+      RaftServerImpl serverImpl =
+          server.getImpl(omRatisServer.getRaftGroup().getGroupId());
+
+      takeSnapshotAndPurgeLogs(serverImpl);
+
+      // TODO: Create marker file with txn index.
+

Review comment:
        Not a comment for this PR, another TODO thing I am thinking: After this 
prepare request be executed, OM should  reject subsequent requests immediately, 
how do we implement this? Exit OM and use upgrade flag to restart OM? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to