This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ed3b699877 HDDS-10026. Remove applyTransactionMap and 
ratisTransactionMap from OzoneManagerStateMachine. (#5891)
ed3b699877 is described below

commit ed3b699877cdebdb77bf84abc49b4000860f40b0
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Jan 13 23:30:28 2024 -0800

    HDDS-10026. Remove applyTransactionMap and ratisTransactionMap from 
OzoneManagerStateMachine. (#5891)
---
 .../ozone/AbstractRootedOzoneFileSystemTest.java   |  13 +-
 .../client/rpc/TestOzoneRpcClientWithRatis.java    |   6 -
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   |  28 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |   3 +-
 .../ozone/om/ratis/OzoneManagerRatisSnapshot.java  |  34 ---
 .../ozone/om/ratis/OzoneManagerStateMachine.java   | 282 ++++++---------------
 .../metrics/OzoneManagerStateMachineMetrics.java   |  11 -
 .../ozone/om/request/upgrade/OMPrepareRequest.java |  48 ++--
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  13 +-
 .../om/ratis/TestOzoneManagerDoubleBuffer.java     |   3 -
 ...tOzoneManagerDoubleBufferWithDummyResponse.java |  13 +-
 ...TestOzoneManagerDoubleBufferWithOMResponse.java |  30 +--
 .../om/ratis/TestOzoneManagerStateMachine.java     | 157 ++----------
 13 files changed, 165 insertions(+), 476 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/AbstractRootedOzoneFileSystemTest.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/AbstractRootedOzoneFileSystemTest.java
index a3555fcf52..61b0281c65 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/AbstractRootedOzoneFileSystemTest.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/AbstractRootedOzoneFileSystemTest.java
@@ -163,12 +163,11 @@ abstract class AbstractRootedOzoneFileSystemTest {
 
   @AfterAll
   void shutdown() {
-    IOUtils.closeQuietly(client);
+    IOUtils.closeQuietly(fs, userOfs, client);
     // Tear down the cluster after EACH set of parameters
     if (cluster != null) {
       cluster.shutdown();
     }
-    IOUtils.closeQuietly(fs, userOfs);
   }
 
   @BeforeEach
@@ -276,15 +275,7 @@ abstract class AbstractRootedOzoneFileSystemTest {
             -> (RootedOzoneFileSystem) FileSystem.get(conf));
 
     if (useOnlyCache) {
-      if (omRatisEnabled) {
-        cluster.getOzoneManager().getOmRatisServer().getOmStateMachine()
-            .getOzoneManagerDoubleBuffer().stopDaemon();
-      } else {
-        cluster.getOzoneManager().getOmServerProtocol()
-            .getOzoneManagerDoubleBuffer().stopDaemon();
-        cluster.getOzoneManager().getOmServerProtocol()
-            .setShouldFlushCache(false);
-      }
+      
cluster.getOzoneManager().getOmServerProtocol().setShouldFlushCache(omRatisEnabled);
     }
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 11ce62d278..253193c92e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
-import 
org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerStateMachineMetrics;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -302,7 +301,6 @@ public class TestOzoneRpcClientWithRatis extends 
TestOzoneRpcClientAbstract {
         .captureLogs(OzoneManagerStateMachine.LOG);
     OzoneManagerStateMachine omSM = getCluster().getOzoneManager()
         .getOmRatisServer().getOmStateMachine();
-    OzoneManagerStateMachineMetrics metrics = omSM.getMetrics();
 
     Thread thread1 = new Thread(() -> {
       try {
@@ -326,8 +324,6 @@ public class TestOzoneRpcClientWithRatis extends 
TestOzoneRpcClientAbstract {
     omSM.getHandler().setInjector(injector);
     thread1.start();
     thread2.start();
-    GenericTestUtils.waitFor(() -> metrics.getApplyTransactionMapSize() > 0,
-        100, 5000);
     Thread.sleep(2000);
     injector.resume();
 
@@ -349,8 +345,6 @@ public class TestOzoneRpcClientWithRatis extends 
TestOzoneRpcClientAbstract {
     }
 
     assertThat(omSMLog.getOutput()).contains("Failed to write, Exception 
occurred");
-    GenericTestUtils.waitFor(() -> metrics.getApplyTransactionMapSize() == 0,
-        100, 5000);
   }
 
   private static class OMRequestHandlerPauseInjector extends FaultInjector {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 96bad99e87..d1d971f3f4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -111,8 +112,7 @@ public final class OzoneManagerDoubleBuffer {
   private final OzoneManagerDoubleBufferMetrics 
ozoneManagerDoubleBufferMetrics;
   private long maxFlushedTransactionsInOneIteration;
 
-  private final OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot;
-
+  private final Consumer<TermIndex> updateLastAppliedIndex;
   private final boolean isRatisEnabled;
   private final boolean isTracingEnabled;
   private final Semaphore unFlushedTransactions;
@@ -124,7 +124,7 @@ public final class OzoneManagerDoubleBuffer {
    */
   public static class Builder {
     private OMMetadataManager mm;
-    private OzoneManagerRatisSnapshot rs;
+    private Consumer<TermIndex> updateLastAppliedIndex = termIndex -> { };
     private boolean isRatisEnabled = false;
     private boolean isTracingEnabled = false;
     private int maxUnFlushedTransactionCount = 0;
@@ -138,9 +138,8 @@ public final class OzoneManagerDoubleBuffer {
       return this;
     }
 
-    public Builder setOzoneManagerRatisSnapShot(
-        OzoneManagerRatisSnapshot omrs) {
-      this.rs = omrs;
+    Builder setUpdateLastAppliedIndex(Consumer<TermIndex> 
updateLastAppliedIndex) {
+      this.updateLastAppliedIndex = updateLastAppliedIndex;
       return this;
     }
 
@@ -176,8 +175,6 @@ public final class OzoneManagerDoubleBuffer {
 
     public OzoneManagerDoubleBuffer build() {
       if (isRatisEnabled) {
-        Preconditions.checkNotNull(rs, "When ratis is enabled, " +
-                "OzoneManagerRatisSnapshot should not be null");
         Preconditions.checkState(maxUnFlushedTransactionCount > 0L,
             "when ratis is enable, maxUnFlushedTransactions " +
                 "should be bigger than 0");
@@ -186,7 +183,7 @@ public final class OzoneManagerDoubleBuffer {
         flushNotifier = new FlushNotifier();
       }
 
-      return new OzoneManagerDoubleBuffer(mm, rs, isRatisEnabled,
+      return new OzoneManagerDoubleBuffer(mm, updateLastAppliedIndex, 
isRatisEnabled,
           isTracingEnabled, maxUnFlushedTransactionCount,
           flushNotifier, s3SecretManager, threadPrefix);
     }
@@ -194,7 +191,7 @@ public final class OzoneManagerDoubleBuffer {
 
   @SuppressWarnings("checkstyle:parameternumber")
   private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
-      OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
+      Consumer<TermIndex> updateLastAppliedIndex,
       boolean isRatisEnabled, boolean isTracingEnabled,
       int maxUnFlushedTransactions,
       FlushNotifier flushNotifier, S3SecretManager s3SecretManager,
@@ -205,7 +202,7 @@ public final class OzoneManagerDoubleBuffer {
     this.isTracingEnabled = isTracingEnabled;
     this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
     this.omMetadataManager = omMetadataManager;
-    this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
+    this.updateLastAppliedIndex = updateLastAppliedIndex;
     this.ozoneManagerDoubleBufferMetrics =
         OzoneManagerDoubleBufferMetrics.create();
     this.flushNotifier = flushNotifier;
@@ -335,9 +332,8 @@ public final class OzoneManagerDoubleBuffer {
         .map(Entry::getTermIndex)
         .sorted()
         .collect(Collectors.toList());
-    final List<Long> flushedEpochs = flushedTransactions.stream()
-        .map(TermIndex::getIndex)
-        .collect(Collectors.toList());
+    final int flushedTransactionsSize = flushedTransactions.size();
+    final TermIndex lastTransaction = 
flushedTransactions.get(flushedTransactionsSize - 1);
 
     try (BatchOperation batchOperation = omMetadataManager.getStore()
         .initBatchOperation()) {
@@ -347,7 +343,6 @@ public final class OzoneManagerDoubleBuffer {
       buffer.iterator().forEachRemaining(
           entry -> addCleanupEntry(entry, cleanupEpochs));
 
-      final TermIndex lastTransaction = 
flushedTransactions.get(flushedTransactions.size() - 1);
 
       addToBatchTransactionInfoWithTrace(lastTraceId,
           lastTransaction.getIndex(),
@@ -372,7 +367,6 @@ public final class OzoneManagerDoubleBuffer {
           .forEach(f -> f.complete(null));
     }
 
-    int flushedTransactionsSize = buffer.size();
     flushedTransactionCount.addAndGet(flushedTransactionsSize);
     flushIterations.incrementAndGet();
 
@@ -389,7 +383,7 @@ public final class OzoneManagerDoubleBuffer {
       releaseUnFlushedTransactions(flushedTransactionsSize);
     }
     // update the last updated index in OzoneManagerStateMachine.
-    ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+    updateLastAppliedIndex.accept(lastTransaction);
 
     // set metrics.
     updateMetrics(flushedTransactionsSize);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index e6a747b35a..6d7e117ada 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -591,9 +591,10 @@ public final class OzoneManagerRatisServer {
   }
 
   public void stop() {
+    LOG.info("Stopping {} at port {}", this, port);
     try {
+      // Ratis will also close the state machine
       server.close();
-      omStateMachine.close();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java
deleted file mode 100644
index aa0e6d98f8..0000000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.om.ratis;
-
-import java.util.List;
-
-/**
- * Functional interface for OM RatisSnapshot.
- */
-
-public interface OzoneManagerRatisSnapshot {
-
-  /**
-   * Update lastAppliedIndex in OzoneManager StateMachine.
-   * @param flushedEpochs - list of ratis transaction indexes which are
-   * flushed to DB.
-   */
-  void updateLastAppliedIndex(List<Long> flushedEpochs);
-}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 46979b6e7f..45767ec7d0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -26,15 +26,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
-import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -66,6 +62,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.TransactionContext;
@@ -73,6 +70,7 @@ import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,15 +101,11 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   private final AtomicInteger statePausedCount = new AtomicInteger(0);
   private final String threadPrefix;
 
-  // Map which contains index and term for the ratis transactions which are
-  // stateMachine entries which are received through applyTransaction.
-  private ConcurrentMap<Long, Long> applyTransactionMap =
-      new ConcurrentSkipListMap<>();
+  /** The last {@link TermIndex} received from {@link 
#notifyTermIndexUpdated(long, long)}. */
+  private volatile TermIndex lastNotifiedTermIndex = TermIndex.valueOf(0, 
RaftLog.INVALID_LOG_INDEX);
+  /** The last index skipped by {@link #notifyTermIndexUpdated(long, long)}. */
+  private volatile long lastSkippedIndex = RaftLog.INVALID_LOG_INDEX;
 
-  // Map which contains index and term for the ratis transactions which are
-  // conf/metadata entries which are received through notifyIndexUpdate.
-  private ConcurrentMap<Long, Long> ratisTransactionMap =
-      new ConcurrentSkipListMap<>();
   private OzoneManagerStateMachineMetrics metrics;
 
 
@@ -177,28 +171,32 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
     ozoneManager.omHAMetricsInit(newLeaderId.toString());
   }
 
-  /**
-   * Called to notify state machine about indexes which are processed
-   * internally by Raft Server, this currently happens when conf entries are
-   * processed in raft Server. This keep state machine to keep a track of index
-   * updates.
-   * @param currentTerm term of the current log entry
-   * @param index index which is being updated
-   */
+  /** Notified by Ratis for non-StateMachine term-index update. */
+  @Override
+  public synchronized void notifyTermIndexUpdated(long currentTerm, long 
newIndex) {
+    final long oldIndex = lastNotifiedTermIndex.getIndex();
+    if (newIndex - oldIndex > 1) {
+      lastSkippedIndex = newIndex - 1;
+    }
+    final TermIndex newTermIndex = TermIndex.valueOf(currentTerm, newIndex);
+    lastNotifiedTermIndex = assertUpdateIncreasingly("lastNotified", 
lastNotifiedTermIndex, newTermIndex);
+  }
+
+  public TermIndex getLastNotifiedTermIndex() {
+    return lastNotifiedTermIndex;
+  }
+
   @Override
-  public void notifyTermIndexUpdated(long currentTerm, long index) {
-    // SnapshotInfo should be updated when the term changes.
-    // The index here refers to the log entry index and the index in
-    // SnapshotInfo represents the snapshotIndex i.e. the index of the last
-    // transaction included in the snapshot. Hence, snaphsotInfo#index is not
-    // updated here.
+  protected synchronized boolean updateLastAppliedTermIndex(TermIndex 
newTermIndex) {
+    assertUpdateIncreasingly("lastApplied", getLastAppliedTermIndex(), 
newTermIndex);
+    return super.updateLastAppliedTermIndex(newTermIndex);
+  }
 
-    // We need to call updateLastApplied here because now in ratis when a
-    // node becomes leader, it is checking stateMachineIndex >=
-    // placeHolderIndex (when a node becomes leader, it writes a conf entry
-    // with some information like its peers and termIndex). So, calling
-    // updateLastApplied updates lastAppliedTermIndex.
-    computeAndUpdateLastAppliedIndex(index, currentTerm, null, false);
+  /** Assert if the given {@link TermIndex} is updated increasingly. */
+  private TermIndex assertUpdateIncreasingly(String name, TermIndex 
oldTermIndex, TermIndex newTermIndex) {
+    Preconditions.checkArgument(newTermIndex.compareTo(oldTermIndex) >= 0,
+        "%s: newTermIndex = %s < oldTermIndex = %s", name, newTermIndex, 
oldTermIndex);
+    return newTermIndex;
   }
 
   /**
@@ -331,7 +329,6 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
           : OMRatisHelper.convertByteStringToOMRequest(
           trx.getStateMachineLogEntry().getLogData());
       final TermIndex termIndex = TermIndex.valueOf(trx.getLogEntry());
-      final long trxLogIndex = termIndex.getIndex();
       // In the current approach we have one single global thread executor.
       // with single thread. Right now this is being done for correctness, as
       // applyTransaction will be run on multiple OM's we want to execute the
@@ -346,72 +343,33 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
       // lastAppliedIndex in OzoneManager StateMachine, even if other
       // executor has completed the transactions with id more.
 
-      // We have 300 transactions, And for each volume we have transactions
-      // of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 -
-      // 299.
-      // Example: Executor1 - Volume1 - 100 (current completed transaction)
-      // Example: Executor2 - Volume2 - 299 (current completed transaction)
-
-      // Now we have applied transactions of 0 - 100 and 149 - 299. We
-      // cannot update lastAppliedIndex to 299. We need to update it to 100,
-      // since 101 - 149 are not applied. When OM restarts it will
-      // applyTransactions from lastAppliedIndex.
-      // We can update the lastAppliedIndex to 100, and update it to 299,
-      // only after completing 101 - 149. In initial stage, we are starting
-      // with single global executor. Will revisit this when needed.
-
-      // Add the term index and transaction log index to applyTransaction map
-      // . This map will be used to update lastAppliedIndex.
-
-      CompletableFuture<Message> ratisFuture =
-          new CompletableFuture<>();
-      applyTransactionMap.put(trxLogIndex, trx.getLogEntry().getTerm());
-
       //if there are too many pending requests, wait for doubleBuffer flushing
       ozoneManagerDoubleBuffer.acquireUnFlushedTransactions(1);
 
-      CompletableFuture<OMResponse> future = CompletableFuture.supplyAsync(
-          () -> runCommand(request, termIndex), executorService);
-      future.thenApply(omResponse -> {
-        if (!omResponse.getSuccess()) {
-          // When INTERNAL_ERROR or METADATA_ERROR it is considered as
-          // critical error and terminate the OM. Considering INTERNAL_ERROR
-          // also for now because INTERNAL_ERROR is thrown for any error
-          // which is not type OMException.
-
-          // Not done future with completeExceptionally because if we do
-          // that OM will still continue applying transaction until next
-          // snapshot. So in OM case if a transaction failed with un
-          // recoverable error and if we wait till snapshot to terminate
-          // OM, then if some client requested the read transaction of the
-          // failed request, there is a chance we shall give wrong result.
-          // So, to avoid these kind of issue, we should terminate OM here.
-          if (omResponse.getStatus() == INTERNAL_ERROR) {
-            terminate(omResponse, OMException.ResultCodes.INTERNAL_ERROR);
-          } else if (omResponse.getStatus() == METADATA_ERROR) {
-            terminate(omResponse, OMException.ResultCodes.METADATA_ERROR);
-          }
-        }
-
-        // For successful response and for all other errors which are not
-        // critical, we can complete future normally.
-        ratisFuture.complete(OMRatisHelper.convertResponseToMessage(
-            omResponse));
-        return ratisFuture;
-      });
-      return ratisFuture;
+      return CompletableFuture.supplyAsync(() -> runCommand(request, 
termIndex), executorService)
+          .thenApply(this::processResponse);
     } catch (Exception e) {
       return completeExceptionally(e);
     }
   }
 
-  /**
-   * Terminate OM.
-   * @param omResponse
-   * @param resultCode
-   */
-  private void terminate(OMResponse omResponse,
-      OMException.ResultCodes resultCode) {
+  private Message processResponse(OMResponse omResponse) {
+    if (!omResponse.getSuccess()) {
+      // INTERNAL_ERROR or METADATA_ERROR are considered as critical errors.
+      // In such cases, OM must be terminated instead of completing the future 
exceptionally,
+      // Otherwise, OM may continue applying transactions which leads to an 
inconsistent state.
+      if (omResponse.getStatus() == INTERNAL_ERROR) {
+        terminate(omResponse, OMException.ResultCodes.INTERNAL_ERROR);
+      } else if (omResponse.getStatus() == METADATA_ERROR) {
+        terminate(omResponse, OMException.ResultCodes.METADATA_ERROR);
+      }
+    }
+
+    // For successful response and non-critical errors, convert the response.
+    return OMRatisHelper.convertResponseToMessage(omResponse);
+  }
+
+  private static void terminate(OMResponse omResponse, OMException.ResultCodes 
resultCode) {
     OMException exception = new OMException(omResponse.getMessage(),
         resultCode);
     String errorMessage = "OM Ratis Server has received unrecoverable " +
@@ -474,7 +432,7 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
             OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT);
     return new OzoneManagerDoubleBuffer.Builder()
         .setOmMetadataManager(ozoneManager.getMetadataManager())
-        .setOzoneManagerRatisSnapShot(this::updateLastAppliedIndex)
+        .setUpdateLastAppliedIndex(this::updateLastAppliedTermIndex)
         .setmaxUnFlushedTransactionCount(maxUnflushedTransactionSize)
         .setThreadPrefix(threadPrefix)
         .setS3SecretManager(ozoneManager.getS3SecretManager())
@@ -493,15 +451,35 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
    */
   @Override
   public long takeSnapshot() throws IOException {
-    LOG.info("Current Snapshot Index {}", getLastAppliedTermIndex());
-    TermIndex lastTermIndex = getLastAppliedTermIndex();
-    final TransactionInfo build = TransactionInfo.valueOf(lastTermIndex);
-    ozoneManager.setTransactionInfo(build);
-    Table<String, TransactionInfo> txnInfoTable =
-        ozoneManager.getMetadataManager().getTransactionInfoTable();
-    txnInfoTable.put(TRANSACTION_INFO_KEY, build);
+    // wait until applied == skipped
+    while (getLastAppliedTermIndex().getIndex() < lastSkippedIndex) {
+      if (ozoneManager.isStopped()) {
+        throw new IOException("OzoneManager is already stopped: " + 
ozoneManager.getNodeDetails());
+      }
+      try {
+        ozoneManagerDoubleBuffer.awaitFlush();
+      } catch (InterruptedException e) {
+        throw IOUtils.toInterruptedIOException("Interrupted 
ozoneManagerDoubleBuffer.awaitFlush", e);
+      }
+    }
+
+    return takeSnapshotImpl();
+  }
+
+  private synchronized long takeSnapshotImpl() throws IOException {
+    final TermIndex applied = getLastAppliedTermIndex();
+    final TermIndex notified = getLastNotifiedTermIndex();
+    final TermIndex snapshot = applied.compareTo(notified) > 0 ? applied : 
notified;
+    LOG.info(" applied = {}", applied);
+    LOG.info(" skipped = {}", lastSkippedIndex);
+    LOG.info("notified = {}", notified);
+    LOG.info("snapshot = {}", snapshot);
+
+    final TransactionInfo transactionInfo = TransactionInfo.valueOf(snapshot);
+    ozoneManager.setTransactionInfo(transactionInfo);
+    
ozoneManager.getMetadataManager().getTransactionInfoTable().put(TRANSACTION_INFO_KEY,
 transactionInfo);
     ozoneManager.getMetadataManager().getStore().flushDB();
-    return lastTermIndex.getIndex();
+    return snapshot.getIndex();
   }
 
   /**
@@ -541,13 +519,13 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     // OM should be shutdown as the StateMachine has shutdown.
-    LOG.info("StateMachine has shutdown. Shutdown OzoneManager if not " +
-        "already shutdown.");
     if (!ozoneManager.isStopped()) {
+      LOG.info("Stopping {}. Shutdown also OzoneManager {}.", this, 
ozoneManager);
       ozoneManager.shutDown("OM state machine is shutdown by Ratis server");
     } else {
+      LOG.info("Stopping {}.", this);
       stop();
     }
   }
@@ -614,87 +592,6 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
     return omResponse;
   }
 
-  /**
-   * Update lastAppliedIndex term and it's corresponding term in the
-   * stateMachine.
-   * @param flushedEpochs
-   */
-  public void updateLastAppliedIndex(List<Long> flushedEpochs) {
-    Preconditions.checkArgument(flushedEpochs.size() > 0);
-    computeAndUpdateLastAppliedIndex(
-        flushedEpochs.get(flushedEpochs.size() - 1), -1L, flushedEpochs, true);
-  }
-
-  /**
-   * Update State machine lastAppliedTermIndex.
-   * @param lastFlushedIndex
-   * @param currentTerm
-   * @param flushedEpochs - list of ratis transactions flushed to DB. If it
-   * is just one index and term, this can be set to null.
-   * @param checkMap - if true check applyTransactionMap, ratisTransaction
-   * Map and update lastAppliedTermIndex accordingly, else check
-   * lastAppliedTermIndex and update it.
-   */
-  private synchronized void computeAndUpdateLastAppliedIndex(
-      long lastFlushedIndex, long currentTerm, List<Long> flushedEpochs,
-      boolean checkMap) {
-    if (checkMap) {
-      List<Long> flushedTrans = new ArrayList<>(flushedEpochs);
-      Long appliedTerm = null;
-      long appliedIndex = -1;
-      for (long i = getLastAppliedTermIndex().getIndex() + 1; ; i++) {
-        if (flushedTrans.contains(i)) {
-          appliedIndex = i;
-          final Long removed = applyTransactionMap.remove(i);
-          appliedTerm = removed;
-          flushedTrans.remove(i);
-        } else if (ratisTransactionMap.containsKey(i)) {
-          final Long removed = ratisTransactionMap.remove(i);
-          appliedTerm = removed;
-          appliedIndex = i;
-        } else {
-          // Add remaining which are left in flushedEpochs to
-          // ratisTransactionMap to be considered further.
-          for (long epoch : flushedTrans) {
-            ratisTransactionMap.put(epoch, applyTransactionMap.remove(epoch));
-          }
-          if (LOG.isDebugEnabled()) {
-            if (!flushedTrans.isEmpty()) {
-              LOG.debug("ComputeAndUpdateLastAppliedIndex due to SM added " +
-                  "to map remaining {}", flushedTrans);
-            }
-          }
-          break;
-        }
-      }
-      if (appliedTerm != null) {
-        updateLastAppliedTermIndex(appliedTerm, appliedIndex);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("ComputeAndUpdateLastAppliedIndex due to SM is {}",
-              getLastAppliedTermIndex());
-        }
-      }
-    } else {
-      if (getLastAppliedTermIndex().getIndex() + 1 == lastFlushedIndex) {
-        updateLastAppliedTermIndex(currentTerm, lastFlushedIndex);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("ComputeAndUpdateLastAppliedIndex due to notifyIndex {}",
-              getLastAppliedTermIndex());
-        }
-      } else {
-        ratisTransactionMap.put(lastFlushedIndex, currentTerm);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("ComputeAndUpdateLastAppliedIndex due to notifyIndex " +
-              "added to map. Passed Term {} index {}, where as lastApplied " +
-              "Index {}", currentTerm, lastFlushedIndex,
-              getLastAppliedTermIndex());
-        }
-      }
-    }
-    this.metrics.updateApplyTransactionMapSize(applyTransactionMap.size());
-    this.metrics.updateRatisTransactionMapSize(ratisTransactionMap.size());
-  }
-
   public void loadSnapshotInfoFromDB() throws IOException {
     // This is done, as we have a check in Ratis for not throwing
     // LeaderNotReadyException, it checks stateMachineIndex >= raftLog
@@ -752,28 +649,11 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
     ozoneManagerDoubleBuffer.stop();
     HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
     HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, 
TimeUnit.SECONDS);
-    LOG.info("applyTransactionMap size {} ", applyTransactionMap.size());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("applyTransactionMap {}",
-          applyTransactionMap.keySet().stream().map(Object::toString)
-              .collect(Collectors.joining(",")));
-    }
-    LOG.info("ratisTransactionMap size {}", ratisTransactionMap.size());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("ratisTransactionMap {}",
-          ratisTransactionMap.keySet().stream().map(Object::toString)
-            .collect(Collectors.joining(",")));
-    }
     if (metrics != null) {
       metrics.unRegister();
     }
   }
 
-  @VisibleForTesting
-  void addApplyTransactionTermIndex(long term, long index) {
-    applyTransactionMap.put(index, term);
-  }
-
   /**
    * Wait until both buffers are flushed.  This is used in cases like
    * "follower bootstrap tarball creation" where the rocksDb for the active
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
index 8ab4fe883a..51d26ef7ac 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.om.ratis.metrics;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -63,16 +62,6 @@ public final class OzoneManagerStateMachineMetrics 
implements MetricsSource {
     }
   }
 
-  @VisibleForTesting
-  public long getApplyTransactionMapSize() {
-    return applyTransactionMapSize.value();
-  }
-
-  @VisibleForTesting
-  public long getRatisTransactionMapSize() {
-    return ratisTransactionMapSize.value();
-  }
-
   public void updateApplyTransactionMapSize(long size) {
     this.applyTransactionMapSize.incr(
         Math.negateExact(applyTransactionMapSize.value()) + size);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index 053daa1c7c..a814459050 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -36,6 +37,7 @@ import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,16 +102,17 @@ public class OMPrepareRequest extends OMClientRequest {
 
       OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
       final RaftServer.Division division = omRatisServer.getServerDivision();
-
-      // Wait for outstanding double buffer entries to flush to disk,
-      // so they will not be purged from the log before being persisted to
-      // the DB.
-      // Since the response for this request was added to the double buffer
-      // already, once this index reaches the state machine, we know all
-      // transactions have been flushed.
-      waitForLogIndex(transactionLogIndex, ozoneManager, division,
+      final OzoneManagerStateMachine stateMachine = (OzoneManagerStateMachine) 
division.getStateMachine();
+
+      // Wait for outstanding double buffer entries
+      // - to be flushed to db, and
+      // - to be notified by Ratis.
+      // The log index returned, will be used as the prepare index, is the 
last Ratis commit index
+      // which can be higher than the transactionLogIndex of this request.
+      final long prepareIndex = waitForLogIndex(transactionLogIndex, 
ozoneManager, stateMachine,
           flushTimeout, flushCheckInterval);
-      takeSnapshotAndPurgeLogs(transactionLogIndex, division);
+      Preconditions.assertTrue(prepareIndex >= transactionLogIndex);
+      takeSnapshotAndPurgeLogs(prepareIndex, division);
 
       // Save prepare index to a marker file, so if the OM restarts,
       // it will remain in prepare mode as long as the file exists and its
@@ -150,11 +153,15 @@ public class OMPrepareRequest extends OMClientRequest {
   }
 
   /**
-   * Waits for the specified index to be flushed to the state machine on
-   * disk, and to be applied to Ratis's state machine.
+   * Waits for the specified index to be applied to {@link 
OzoneManagerStateMachine}.
+   * Note that
+   * - the applied index is updated after the transaction is flushed to db.
+   * - after a transaction (i) is committed, ratis will append another 
ratis-metadata transaction (i+1).
+   *
+   * @return the last Ratis commit index
    */
-  private static void waitForLogIndex(long minOMDBFlushIndex,
-      OzoneManager om, RaftServer.Division division,
+  private static long waitForLogIndex(long minOMDBFlushIndex,
+      OzoneManager om, OzoneManagerStateMachine stateMachine,
       Duration flushTimeout, Duration flushCheckInterval)
       throws InterruptedException, IOException {
 
@@ -168,8 +175,10 @@ public class OMPrepareRequest extends OMClientRequest {
     // snapshot is taken.
     // If we purge logs without waiting for this index, it may not make it to
     // the RocksDB snapshot, and then the log entry is lost on this OM.
-    long minRatisStateMachineIndex = minOMDBFlushIndex + 1;
+    long minRatisStateMachineIndex = minOMDBFlushIndex + 1; // for the 
ratis-metadata transaction
     long lastRatisCommitIndex = RaftLog.INVALID_LOG_INDEX;
+
+    // Wait OM state machine to apply the given index.
     long lastOMDBFlushIndex = RaftLog.INVALID_LOG_INDEX;
 
     LOG.info("{} waiting for index {} to flush to OM DB and index {} to flush" 
+
@@ -184,8 +193,7 @@ public class OMPrepareRequest extends OMClientRequest {
           lastOMDBFlushIndex);
 
       // Check ratis state machine.
-      lastRatisCommitIndex =
-          division.getStateMachine().getLastAppliedTermIndex().getIndex();
+      lastRatisCommitIndex = 
stateMachine.getLastNotifiedTermIndex().getIndex();
       ratisStateMachineApplied = (lastRatisCommitIndex >=
           minRatisStateMachineIndex);
       LOG.debug("{} Current Ratis state machine transaction index {}.",
@@ -211,6 +219,7 @@ public class OMPrepareRequest extends OMClientRequest {
           flushTimeout.getSeconds(), lastRatisCommitIndex,
           minRatisStateMachineIndex));
     }
+    return lastRatisCommitIndex;
   }
 
   /**
@@ -226,6 +235,7 @@ public class OMPrepareRequest extends OMClientRequest {
       RaftServer.Division division) throws IOException {
     StateMachine stateMachine = division.getStateMachine();
     long snapshotIndex = stateMachine.takeSnapshot();
+    LOG.info("takeSnapshot at {} for prepareIndex {}", snapshotIndex, 
prepareIndex);
 
     if (snapshotIndex < prepareIndex) {
       throw new IOException(String.format("OM DB snapshot index %d is less " +
@@ -245,12 +255,6 @@ public class OMPrepareRequest extends OMClientRequest {
               "match specified purge index {}. ", actualPurgeIndex,
             snapshotIndex);
       }
-
-      if (actualPurgeIndex < prepareIndex) {
-        throw new IOException(String.format("Actual purge index %d is less " +
-          "than prepare index %d. Some required logs may not have" +
-            " been removed.", actualPurgeIndex, prepareIndex));
-      }
     } catch (ExecutionException e) {
       // Ozone manager error handler does not respect exception chaining and
       // only displays the message of the top level exception.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index bebb2b4324..a4ec2c2f20 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -115,11 +115,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
     } else {
       this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer.Builder()
           .setOmMetadataManager(ozoneManager.getMetadataManager())
-          // Do nothing.
-          // For OM NON-HA code, there is no need to save transaction index.
-          // As we wait until the double buffer flushes DB to disk.
-          .setOzoneManagerRatisSnapShot((i) -> {
-          })
           .enableRatis(isRatisEnabled)
           .enableTracing(TracingUtil.isTracingEnabled(
               ozoneManager.getConfiguration()))
@@ -367,13 +362,11 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
     ozoneManagerDoubleBuffer.awaitFlush();
   }
 
-  @VisibleForTesting
-  public OzoneManagerDoubleBuffer getOzoneManagerDoubleBuffer() {
-    return ozoneManagerDoubleBuffer;
-  }
-
   @VisibleForTesting
   public void setShouldFlushCache(boolean shouldFlushCache) {
+    if (ozoneManagerDoubleBuffer != null) {
+      ozoneManagerDoubleBuffer.stopDaemon();
+    }
     this.shouldFlushCache = shouldFlushCache;
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
index 859d54eb5f..202234a0d4 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
@@ -135,15 +135,12 @@ class TestOzoneManagerDoubleBuffer {
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
     doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
     doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
-    OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot = index -> {
-    };
 
     flushNotifier = new OzoneManagerDoubleBuffer.FlushNotifier();
     spyFlushNotifier = spy(flushNotifier);
     doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
         .setOmMetadataManager(omMetadataManager)
         .setS3SecretManager(secretManager)
-        .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
         .setmaxUnFlushedTransactionCount(1000)
         .enableRatis(true)
         .setFlushNotifier(spyFlushNotifier)
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
index bdff3c0467..635f86f3ab 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
@@ -64,7 +64,6 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
   private OMMetadataManager omMetadataManager;
   private OzoneManagerDoubleBuffer doubleBuffer;
   private final AtomicLong trxId = new AtomicLong(0);
-  private long lastAppliedIndex;
   private long term = 1L;
   @TempDir
   private Path folder;
@@ -76,12 +75,8 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
         folder.toAbsolutePath().toString());
     omMetadataManager =
         new OmMetadataManagerImpl(configuration, null);
-    OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot = index -> {
-      lastAppliedIndex = index.get(index.size() - 1);
-    };
     doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
         .setOmMetadataManager(omMetadataManager)
-        .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
         .setmaxUnFlushedTransactionCount(10000)
         .enableRatis(true)
         .build();
@@ -133,15 +128,11 @@ public class 
TestOzoneManagerDoubleBufferWithDummyResponse {
         OzoneManagerDoubleBufferMetrics.create();
     assertEquals(metrics, metricsCopy);
 
-    // Check lastAppliedIndex is updated correctly or not.
-    assertEquals(bucketCount, lastAppliedIndex);
-
-
     TransactionInfo transactionInfo =
         omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+    // Check lastAppliedIndex is updated correctly or not.
     assertNotNull(transactionInfo);
-
-    assertEquals(lastAppliedIndex, transactionInfo.getTransactionIndex());
+    assertEquals(bucketCount, transactionInfo.getTransactionIndex());
     assertEquals(term, transactionInfo.getTerm());
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
index 2d58088e25..7178868dcf 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
@@ -87,8 +87,6 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
   private OMMetadataManager omMetadataManager;
   private OzoneManagerDoubleBuffer doubleBuffer;
   private final AtomicLong trxId = new AtomicLong(0);
-  private OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot;
-  private volatile long lastAppliedIndex;
   private long term = 1L;
 
   @TempDir
@@ -109,12 +107,8 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
     auditLogger = mock(AuditLogger.class);
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
     doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
-    ozoneManagerRatisSnapshot = index -> {
-      lastAppliedIndex = index.get(index.size() - 1);
-    };
     doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
         .setOmMetadataManager(omMetadataManager)
-        .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
         .setmaxUnFlushedTransactionCount(100000)
         .enableRatis(true)
         .build();
@@ -198,16 +192,21 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
     checkDeletedBuckets(deleteBucketQueue);
 
     // Check lastAppliedIndex is updated correctly or not.
-    GenericTestUtils.waitFor(() ->
-        bucketCount + deleteCount + 1 == lastAppliedIndex,
+    final long expectedIndex = bucketCount + deleteCount + 1;
+    GenericTestUtils.waitFor(() -> assertTransactionInfo(expectedIndex),
         100, 30000);
+  }
 
-    TransactionInfo transactionInfo =
-        omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
-    assertNotNull(transactionInfo);
-
-    assertEquals(lastAppliedIndex, transactionInfo.getTransactionIndex());
-    assertEquals(term, transactionInfo.getTerm());
+  private boolean assertTransactionInfo(long lastAppliedIndex) {
+    final TransactionInfo info;
+    try {
+      info = 
omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+    } catch (IOException e) {
+      return false;
+    }
+    return info != null
+        && info.getTransactionIndex() == lastAppliedIndex
+        && info.getTerm() == term;
   }
 
   /**
@@ -275,7 +274,8 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
     // running in parallel, so lastAppliedIndex cannot be always
     // total transaction count. So, just checking here whether it is less
     // than total transaction count.
-    assertThat(lastAppliedIndex).isLessThanOrEqualTo(bucketCount + deleteCount 
+ 2);
+    final TransactionInfo info = 
omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+    assertThat(info.getTransactionIndex()).isLessThanOrEqualTo(bucketCount + 
deleteCount + 2);
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index bb9e4ee9b9..3ea7c51264 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -36,14 +36,14 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInf
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -91,154 +91,43 @@ public class TestOzoneManagerStateMachine {
     when(ozoneManager.getConfiguration()).thenReturn(conf);
     ozoneManagerStateMachine =
         new OzoneManagerStateMachine(ozoneManagerRatisServer, false);
-    ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0);
+  }
+
+  static void assertTermIndex(long expectedTerm, long expectedIndex, TermIndex 
computed) {
+    assertEquals(expectedTerm, computed.getTerm());
+    assertEquals(expectedIndex, computed.getIndex());
   }
 
   @Test
   public void testLastAppliedIndex() {
-
-    // Happy scenario.
+    ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0);
+    assertTermIndex(0, RaftLog.INVALID_LOG_INDEX, 
ozoneManagerStateMachine.getLastAppliedTermIndex());
+    assertTermIndex(0, 0, ozoneManagerStateMachine.getLastNotifiedTermIndex());
 
     // Conf/metadata transaction.
     ozoneManagerStateMachine.notifyTermIndexUpdated(0, 1);
-    assertEquals(0, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(1, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-    List<Long> flushedEpochs = new ArrayList<>();
-
-    // Add some apply transactions.
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0, 2);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0, 3);
-
-    flushedEpochs.add(2L);
-    flushedEpochs.add(3L);
+    assertTermIndex(0, RaftLog.INVALID_LOG_INDEX, 
ozoneManagerStateMachine.getLastAppliedTermIndex());
+    assertTermIndex(0, 1, ozoneManagerStateMachine.getLastNotifiedTermIndex());
 
     // call update last applied index
-    ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
+    ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(0, 
2));
+    ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(0, 
3));
 
-    assertEquals(0, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(3, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
+    assertTermIndex(0, 3, ozoneManagerStateMachine.getLastAppliedTermIndex());
+    assertTermIndex(0, 1, ozoneManagerStateMachine.getLastNotifiedTermIndex());
 
     // Conf/metadata transaction.
-    ozoneManagerStateMachine.notifyTermIndexUpdated(0L, 4L);
+    ozoneManagerStateMachine.notifyTermIndexUpdated(1L, 4L);
 
-    assertEquals(0L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(4L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
+    assertTermIndex(0, 3, ozoneManagerStateMachine.getLastAppliedTermIndex());
+    assertTermIndex(1, 4, ozoneManagerStateMachine.getLastNotifiedTermIndex());
 
     // Add some apply transactions.
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 5L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 6L);
-
-    flushedEpochs.clear();
-    flushedEpochs.add(5L);
-    flushedEpochs.add(6L);
-    ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
-
-    assertEquals(0L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(6L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-
-  }
-
-
-  @Test
-  public void testApplyTransactionsUpdateLastAppliedIndexCalledLate() {
-    // Now try a scenario where 1,2,3 transactions are in applyTransactionMap
-    // and updateLastAppliedIndex is not called for them, and before that
-    // notifyTermIndexUpdated is called with transaction 4. And see now at the
-    // end when updateLastAppliedIndex is called with epochs we have
-    // lastAppliedIndex as 4 or not.
-
-    // Conf/metadata transaction.
-    ozoneManagerStateMachine.notifyTermIndexUpdated(0, 1);
-    assertEquals(0, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(1, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-
-
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 2L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 3L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 4L);
-
-
-
-    // Conf/metadata transaction.
-    ozoneManagerStateMachine.notifyTermIndexUpdated(0L, 5L);
-
-  // Still it should be zero, as for 2,3,4 updateLastAppliedIndex is not yet
-    // called so the lastAppliedIndex will be at older value.
-    assertEquals(0L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(1L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-    List<Long> flushedEpochs = new ArrayList<>();
-
-
-    flushedEpochs.add(2L);
-    flushedEpochs.add(3L);
-    flushedEpochs.add(4L);
-
-    ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
-
-    assertEquals(0L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(5L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-  }
-
-
-  @Test
-  public void testLastAppliedIndexWithMultipleExecutors() {
-
-    // first flush batch
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 1L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 2L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 4L);
-
-    List<Long> flushedEpochs = new ArrayList<>();
-
-
-    flushedEpochs.add(1L);
-    flushedEpochs.add(2L);
-    flushedEpochs.add(4L);
-
-    ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
-
-    assertEquals(0L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(2L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-
-
-
-    // 2nd flush batch
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 3L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 5L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 6L);
-
-    flushedEpochs.clear();
-    flushedEpochs.add(3L);
-    flushedEpochs.add(5L);
-    flushedEpochs.add(6L);
-
-    ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
-
-    assertEquals(0L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(6L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
-
-    // 3rd flush batch
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 7L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 8L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 9L);
-    ozoneManagerStateMachine.addApplyTransactionTermIndex(0L, 10L);
-
-    flushedEpochs.clear();
-    flushedEpochs.add(7L);
-    flushedEpochs.add(8L);
-    flushedEpochs.add(9L);
-    flushedEpochs.add(10L);
-
-    ozoneManagerStateMachine.updateLastAppliedIndex(flushedEpochs);
+    ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(1L, 
5L));
+    ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(1L, 
6L));
 
-    assertEquals(0L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
-    assertEquals(10L, 
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());
+    assertTermIndex(1, 6, ozoneManagerStateMachine.getLastAppliedTermIndex());
+    assertTermIndex(1, 4, ozoneManagerStateMachine.getLastNotifiedTermIndex());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to