This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c50751a2c7 HDDS-14717. Improve test coverage for 
OzoneManagerStateMachine (#9825)
9c50751a2c7 is described below

commit 9c50751a2c7d8c3218d910a4d884506287455521
Author: Istvan Fajth <[email protected]>
AuthorDate: Sat Feb 28 06:38:43 2026 +0100

    HDDS-14717. Improve test coverage for OzoneManagerStateMachine (#9825)
    
    Generated-by: Claude - Opus 4.6
---
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |   35 +-
 .../om/ratis/TestOzoneManagerStateMachine.java     | 1080 +++++++++++++++++---
 .../protocolPB/TestOzoneManagerRequestHandler.java |   69 ++
 3 files changed, 1015 insertions(+), 169 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index a3ad217ceef..ddad0365291 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -73,7 +73,6 @@
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.IOUtils;
@@ -94,8 +93,6 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   private static final String AUDIT_PARAM_PREVIOUS_LEADER = "previousLeader";
   private static final String AUDIT_PARAM_NEW_LEADER = "newLeader";
   private RaftPeerId previousLeaderId = null;
-  private final SimpleStateMachineStorage storage =
-      new SimpleStateMachineStorage();
   private final OzoneManager ozoneManager;
   private RequestHandler handler;
   private volatile OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
@@ -135,15 +132,32 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer 
ratisServer,
     this.nettyMetrics = NettyMetrics.create();
   }
 
+  @VisibleForTesting
+  OzoneManagerStateMachine(OzoneManager ozoneManager,
+      OzoneManagerDoubleBuffer doubleBuffer,
+      RequestHandler handler,
+      ExecutorService executorService,
+      NettyMetrics nettyMetrics) {
+    this.isTracingEnabled = false;
+    this.ozoneManager = ozoneManager;
+    this.threadPrefix = "";
+    this.ozoneManagerDoubleBuffer = doubleBuffer;
+    this.handler = handler;
+    this.executorService = executorService;
+    ThreadFactory installSnapshotThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("TestInstallSnapshotThread").build();
+    this.installSnapshotExecutor =
+        HadoopExecutors.newSingleThreadExecutor(installSnapshotThreadFactory);
+    this.nettyMetrics = nettyMetrics;
+  }
+
   /**
    * Initializes the State Machine with the given server, group and storage.
    */
   @Override
-  public void initialize(RaftServer server, RaftGroupId id,
-      RaftStorage raftStorage) throws IOException {
+  public void initialize(RaftServer server, RaftGroupId id, RaftStorage 
raftStorage) throws IOException {
     getLifeCycle().startAndTransition(() -> {
       super.initialize(server, id, raftStorage);
-      storage.init(raftStorage);
       LOG.info("{}: initialize {} with {}", getId(), id, 
getLastAppliedTermIndex());
     });
   }
@@ -419,7 +433,8 @@ public CompletableFuture<Message> 
applyTransaction(TransactionContext trx) {
     }
   }
 
-  private Message processResponse(OMResponse omResponse) {
+  @VisibleForTesting
+  Message processResponse(OMResponse omResponse) {
     if (!omResponse.getSuccess()) {
       // INTERNAL_ERROR or METADATA_ERROR are considered as critical errors.
       // In such cases, OM must be terminated instead of completing the future 
exceptionally,
@@ -593,7 +608,8 @@ public void close() {
    * @param request OMRequest
    * @return response from OM
    */
-  private OMResponse runCommand(OMRequest request, TermIndex termIndex) {
+  @VisibleForTesting
+  OMResponse runCommand(OMRequest request, TermIndex termIndex) {
     try {
       ExecutionContext context = ExecutionContext.of(termIndex.getIndex(), 
termIndex);
       final OMClientResponse omClientResponse = handler.handleWriteRequest(
@@ -617,7 +633,8 @@ private OMResponse runCommand(OMRequest request, TermIndex 
termIndex) {
     return null;
   }
 
-  private OMResponse createErrorResponse(
+  @VisibleForTesting
+  OMResponse createErrorResponse(
       OMRequest omRequest, IOException exception, TermIndex termIndex) {
     OMResponse.Builder omResponseBuilder = OMResponse.newBuilder()
         .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index 797f6aa2c3e..d799556a6f9 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -20,261 +20,1021 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.any;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.AuditEventStatus;
 import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.audit.OMSystemAction;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmConfig;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareRequestArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo;
+import org.apache.hadoop.ozone.protocolPB.RequestHandler;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.ExitUtils;
-import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.mockito.Mockito;
+import org.mockito.ArgumentCaptor;
 
 /**
  * Class to test OzoneManagerStateMachine.
  */
 public class TestOzoneManagerStateMachine {
 
-  @TempDir
-  private Path tempDir;
-
-  private OzoneManagerStateMachine ozoneManagerStateMachine;
-  private OzoneManagerPrepareState prepareState;
-  private AuditLogger auditLogger;
+  private OzoneManager om;
+  private OzoneManagerDoubleBuffer doubleBuffer;
+  private RequestHandler handler;
+  private ExecutorService executor;
+  private OzoneManagerStateMachine sm;
 
   @BeforeEach
-  public void setup() throws Exception {
-    OzoneManagerRatisServer ozoneManagerRatisServer =
-        mock(OzoneManagerRatisServer.class);
-    OzoneManager ozoneManager = mock(OzoneManager.class);
-    // Allow testing of prepare pre-append gate.
-    when(ozoneManager.isAdmin(any(UserGroupInformation.class)))
-        .thenReturn(true);
+  public void setup() {
+    ExitUtils.disableSystemExit();
+    om = mock(OzoneManager.class);
+    doubleBuffer = mock(OzoneManagerDoubleBuffer.class);
+    handler = mock(RequestHandler.class);
+    executor = Executors.newSingleThreadExecutor();
+    sm = new OzoneManagerStateMachine(om, doubleBuffer, handler, executor, 
null);
+  }
+
+  @AfterEach
+  public void tearDown() {
+    sm.stop();
+  }
+
+  // --- startTransaction tests ---
+
+  @Test
+  public void testStartTransactionHappyPath() throws Exception {
+    RaftGroupId groupId = initializeStateMachine();
+    OMRequest omRequest = sampleWriteRequest();
+    RaftClientRequest clientRequest = buildClientRequest(groupId, omRequest);
+
+    TransactionContext trx = sm.startTransaction(clientRequest);
+
+    assertNotNull(trx);
+    assertNotNull(trx.getStateMachineContext());
+    assertInstanceOf(OMRequest.class, trx.getStateMachineContext());
+    assertNotNull(trx.getStateMachineLogEntry().getLogData());
+    assertNull(trx.getException());
+    verify(handler).validateRequest(any(OMRequest.class));
+  }
+
+  @Test
+  public void testStartTransactionValidationFailure() throws Exception {
+    RaftGroupId groupId = initializeStateMachine();
+    OMRequest omRequest = sampleWriteRequest();
+    RaftClientRequest clientRequest = buildClientRequest(groupId, omRequest);
+
+    OMException validationError = new OMException("request validation failed",
+        OMException.ResultCodes.INVALID_REQUEST);
+    
doThrow(validationError).when(handler).validateRequest(any(OMRequest.class));
+
+    TransactionContext trx = sm.startTransaction(clientRequest);
 
+    assertNotNull(trx);
+    assertNotNull(trx.getException());
+    assertEquals(validationError, trx.getException());
+  }
+
+  @Test
+  public void testStartTransactionGroupIdMismatchThrows() throws Exception {
+    initializeStateMachine();
+    RaftGroupId wrongGroupId = RaftGroupId.randomId();
+    OMRequest omRequest = sampleWriteRequest();
+    RaftClientRequest clientRequest = buildClientRequest(wrongGroupId, 
omRequest);
+
+    assertThrows(IllegalArgumentException.class,
+        () -> sm.startTransaction(clientRequest));
+  }
+
+  // --- preAppendTransaction tests ---
+
+  @Test
+  public void testPreAppendTransactionPrepareGate() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
-    conf.set(OMConfigKeys.OZONE_OM_DB_DIRS,
-        tempDir.toAbsolutePath().toString());
+    OzoneManagerPrepareState ps = new OzoneManagerPrepareState(conf);
+    when(om.getPrepareState()).thenReturn(ps);
+    when(om.isAdmin(any(UserGroupInformation.class))).thenReturn(true);
 
-    OMMetadataManager omMetadataManager = new OmMetadataManagerImpl(conf,
-        ozoneManager);
+    OMRequest writeRequest = sampleWriteRequest();
+    OMRequest prepareRequest = OMRequest.newBuilder()
+        .setPrepareRequest(PrepareRequest.newBuilder()
+            .setArgs(PrepareRequestArgs.getDefaultInstance()))
+        .setCmdType(Type.Prepare)
+        .setClientId("123")
+        .setUserInfo(UserInfo.newBuilder()
+            .setUserName("user")
+            .setHostName("localhost")
+            .setRemoteAddress("127.0.0.1"))
+        .build();
 
-    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
-    auditLogger = mock(AuditLogger.class);
+    // Write request passes before prepare
+    TransactionContext writeTrx = mockTrx(writeRequest, 1, 1);
+    assertSame(writeTrx, sm.preAppendTransaction(writeTrx));
+    assertEquals(PrepareStatus.NOT_PREPARED, ps.getState().getStatus());
 
-    when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
-    prepareState = new OzoneManagerPrepareState(conf);
-    when(ozoneManager.getPrepareState()).thenReturn(prepareState);
+    // Prepare enables gate
+    TransactionContext prepareTrx = mockTrx(prepareRequest, 1, 2);
+    assertSame(prepareTrx, sm.preAppendTransaction(prepareTrx));
+    assertEquals(PrepareStatus.PREPARE_GATE_ENABLED,
+        ps.getState().getStatus());
+
+    // Write request now blocked
+    TransactionContext writeTrx2 = mockTrx(writeRequest, 1, 3);
+    StateMachineException ex = assertThrows(StateMachineException.class,
+        () -> sm.preAppendTransaction(writeTrx2));
+    assertFalse(ex.leaderShouldStepDown());
+    assertInstanceOf(OMException.class, ex.getCause());
+    assertEquals(OMException.ResultCodes.NOT_SUPPORTED_OPERATION_WHEN_PREPARED,
+        ((OMException) ex.getCause()).getResult());
 
-    when(ozoneManagerRatisServer.getOzoneManager()).thenReturn(ozoneManager);
-    
when(ozoneManager.getTransactionInfo()).thenReturn(mock(TransactionInfo.class));
-    when(ozoneManager.getConfiguration()).thenReturn(conf);
-    final OmConfig omConfig = conf.getObject(OmConfig.class);
-    when(ozoneManager.getConfig()).thenReturn(omConfig);
-    ozoneManagerStateMachine =
-        new OzoneManagerStateMachine(ozoneManagerRatisServer, false);
+    // Can prepare again
+    TransactionContext prepareTrx2 = mockTrx(prepareRequest, 1, 4);
+    assertSame(prepareTrx2, sm.preAppendTransaction(prepareTrx2));
   }
 
-  static void assertTermIndex(long expectedTerm, long expectedIndex, TermIndex 
computed) {
-    assertEquals(expectedTerm, computed.getTerm());
-    assertEquals(expectedIndex, computed.getIndex());
+  @Test
+  public void testPreAppendTransactionAclDenied() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    OzoneManagerPrepareState ps = new OzoneManagerPrepareState(conf);
+    when(om.getPrepareState()).thenReturn(ps);
+    when(om.getAclsEnabled()).thenReturn(true);
+    when(om.isAdmin(any(UserGroupInformation.class))).thenReturn(false);
+
+    OMRequest prepareRequest = OMRequest.newBuilder()
+        .setPrepareRequest(PrepareRequest.newBuilder()
+            .setArgs(PrepareRequestArgs.getDefaultInstance()))
+        .setCmdType(Type.Prepare)
+        .setClientId("123")
+        .setUserInfo(UserInfo.newBuilder()
+            .setUserName("nonAdminUser")
+            .setHostName("localhost")
+            .setRemoteAddress("127.0.0.1"))
+        .build();
+
+    TransactionContext trx = mockTrx(prepareRequest, 1, 1);
+
+    StateMachineException ex = assertThrows(StateMachineException.class,
+        () -> sm.preAppendTransaction(trx));
+    assertFalse(ex.leaderShouldStepDown());
+    assertInstanceOf(OMException.class, ex.getCause());
+    assertEquals(OMException.ResultCodes.ACCESS_DENIED,
+        ((OMException) ex.getCause()).getResult());
   }
 
+  // --- applyTransaction tests ---
+
   @Test
-  public void testLastAppliedIndex() {
-    ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0);
-    assertTermIndex(0, 0, ozoneManagerStateMachine.getLastAppliedTermIndex());
-    assertTermIndex(0, 0, ozoneManagerStateMachine.getLastNotifiedTermIndex());
+  public void testApplyTransactionHappyPath() throws Exception {
+    OMRequest request = sampleWriteRequest();
+    TransactionContext trx = mockTrx(request, 1, 5);
+
+    OMResponse expectedResponse = OMResponse.newBuilder()
+        .setCmdType(Type.CreateKey)
+        .setStatus(Status.OK)
+        .setSuccess(true)
+        .build();
 
-    // Conf/metadata transaction.
-    ozoneManagerStateMachine.notifyTermIndexUpdated(0, 1);
-    assertTermIndex(0, 1, ozoneManagerStateMachine.getLastAppliedTermIndex());
-    assertTermIndex(0, 1, ozoneManagerStateMachine.getLastNotifiedTermIndex());
+    OMClientResponse clientResponse = mock(OMClientResponse.class);
+    when(clientResponse.getOMResponse()).thenReturn(expectedResponse);
+    when(clientResponse.getOmLockDetails()).thenReturn(null);
 
-    // call update last applied index
-    ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(0, 
2));
-    ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(0, 
3));
+    when(handler.handleWriteRequest(eq(request), any(), eq(doubleBuffer)))
+        .thenReturn(clientResponse);
 
-    assertTermIndex(0, 3, ozoneManagerStateMachine.getLastAppliedTermIndex());
-    assertTermIndex(0, 1, ozoneManagerStateMachine.getLastNotifiedTermIndex());
+    CompletableFuture<Message> future = sm.applyTransaction(trx);
+    Message result = future.get();
 
-    // Conf/metadata transaction.
-    ozoneManagerStateMachine.notifyTermIndexUpdated(1L, 4L);
+    assertNotNull(result);
+    verify(doubleBuffer).acquireUnFlushedTransactions(1);
+  }
+
+  @Test
+  public void testApplyTransactionFollowerPath() throws Exception {
+    OMRequest request = sampleWriteRequest();
+    // Null context simulates follower: request deserialized from log data
+    TransactionContext trx = mockTrx(request, 1, 5, null);
+
+    OMResponse expectedResponse = OMResponse.newBuilder()
+        .setCmdType(Type.CreateKey)
+        .setStatus(Status.OK)
+        .setSuccess(true)
+        .build();
+
+    OMClientResponse clientResponse = mock(OMClientResponse.class);
+    when(clientResponse.getOMResponse()).thenReturn(expectedResponse);
+    when(clientResponse.getOmLockDetails()).thenReturn(null);
 
-    assertTermIndex(1, 4, ozoneManagerStateMachine.getLastAppliedTermIndex());
-    assertTermIndex(1, 4, ozoneManagerStateMachine.getLastNotifiedTermIndex());
+    when(handler.handleWriteRequest(any(OMRequest.class), any(), 
eq(doubleBuffer)))
+        .thenReturn(clientResponse);
 
-    // Add some apply transactions.
-    ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(1L, 
5L));
-    ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(1L, 
6L));
+    CompletableFuture<Message> future = sm.applyTransaction(trx);
+    Message result = future.get();
 
-    assertTermIndex(1, 6, ozoneManagerStateMachine.getLastAppliedTermIndex());
-    assertTermIndex(1, 4, ozoneManagerStateMachine.getLastNotifiedTermIndex());
+    assertNotNull(result);
   }
 
   @Test
-  public void testNotifyTermIndexPendingBufferUpdateIndex() {
-    ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0);
-    assertTermIndex(0, 0, ozoneManagerStateMachine.getLastAppliedTermIndex());
-    assertTermIndex(0, 0, ozoneManagerStateMachine.getLastNotifiedTermIndex());
+  public void testApplyTransactionBackpressureInterrupt() throws Exception {
+    OMRequest request = sampleWriteRequest();
+    TransactionContext trx = mockTrx(request, 1, 5);
 
-    // notifyTermIndex with skipping one of transaction which is from 
applyTransaction
-    ozoneManagerStateMachine.notifyTermIndexUpdated(0, 2);
-    ozoneManagerStateMachine.notifyTermIndexUpdated(0, 3);
-    assertTermIndex(0, 0, ozoneManagerStateMachine.getLastAppliedTermIndex());
-    assertTermIndex(0, 3, ozoneManagerStateMachine.getLastNotifiedTermIndex());
+    doThrow(new InterruptedException("backpressure"))
+        .when(doubleBuffer).acquireUnFlushedTransactions(1);
 
-    // applyTransaction update with missing transaction as above
-    ozoneManagerStateMachine.updateLastAppliedTermIndex(TermIndex.valueOf(0, 
1));
-    assertTermIndex(0, 3, ozoneManagerStateMachine.getLastAppliedTermIndex());
+    CompletableFuture<Message> future = sm.applyTransaction(trx);
 
-    assertTermIndex(0, 3, ozoneManagerStateMachine.getLastAppliedTermIndex());
-    assertTermIndex(0, 3, ozoneManagerStateMachine.getLastNotifiedTermIndex());
+    ExecutionException ex = assertThrows(ExecutionException.class, 
future::get);
+    assertInstanceOf(InterruptedException.class, ex.getCause());
   }
 
+  // --- runCommand tests ---
+
   @Test
-  public void testPreAppendTransaction() throws Exception {
-    // Submit write request.
-    KeyArgs args = KeyArgs
-        .newBuilder()
-        .setVolumeName("volume")
-        .setBucketName("bucket")
-        .setKeyName("key")
+  public void testRunCommandHappyPath() throws Exception {
+    OMRequest request = sampleWriteRequest();
+    TermIndex ti = TermIndex.valueOf(1, 5);
+
+    OMResponse expectedResponse = OMResponse.newBuilder()
+        .setCmdType(Type.CreateKey)
+        .setStatus(Status.OK)
+        .setSuccess(true)
         .build();
-    OMRequest createKeyRequest = OMRequest.newBuilder()
-        .setCreateKeyRequest(CreateKeyRequest.newBuilder().setKeyArgs(args))
+
+    OMLockDetails lockDetails = mock(OMLockDetails.class);
+    when(lockDetails.toProtobufBuilder()).thenReturn(
+        OzoneManagerProtocolProtos.OMLockDetailsProto.newBuilder()
+            .setIsLockAcquired(true)
+            .setWaitLockNanos(100)
+            .setReadLockNanos(50)
+            .setWriteLockNanos(200));
+
+    OMClientResponse clientResponse = mock(OMClientResponse.class);
+    when(clientResponse.getOMResponse()).thenReturn(expectedResponse);
+    when(clientResponse.getOmLockDetails()).thenReturn(lockDetails);
+
+    when(handler.handleWriteRequest(eq(request), any(), eq(doubleBuffer)))
+        .thenReturn(clientResponse);
+
+    OMResponse result = sm.runCommand(request, ti);
+
+    assertNotNull(result);
+    assertTrue(result.getSuccess());
+    assertEquals(Status.OK, result.getStatus());
+    assertTrue(result.hasOmLockDetails());
+  }
+
+  @Test
+  public void testRunCommandNullLockDetails() throws Exception {
+    OMRequest request = sampleWriteRequest();
+    TermIndex ti = TermIndex.valueOf(1, 5);
+
+    OMResponse expectedResponse = OMResponse.newBuilder()
         .setCmdType(Type.CreateKey)
-        .setClientId("123")
-        .setUserInfo(UserInfo
-            .newBuilder()
-            .setUserName("user")
-            .setHostName("localhost")
-            .setRemoteAddress("127.0.0.1"))
+        .setStatus(Status.OK)
+        .setSuccess(true)
         .build();
-    // Without prepare enabled, the txn should be returned unaltered.
-    TransactionContext submittedTrx = mockTransactionContext(createKeyRequest);
-    TransactionContext returnedTrx =
-        ozoneManagerStateMachine.preAppendTransaction(submittedTrx);
-    assertSame(submittedTrx, returnedTrx);
 
-    assertEquals(PrepareStatus.NOT_PREPARED, 
prepareState.getState().getStatus());
+    OMClientResponse clientResponse = mock(OMClientResponse.class);
+    when(clientResponse.getOMResponse()).thenReturn(expectedResponse);
+    when(clientResponse.getOmLockDetails()).thenReturn(null);
 
-    // Submit prepare request.
-    OMRequest prepareRequest = OMRequest.newBuilder()
-        .setPrepareRequest(
-            PrepareRequest.newBuilder()
-                .setArgs(PrepareRequestArgs.getDefaultInstance()))
-        .setCmdType(Type.Prepare)
-        .setClientId("123")
-        .setUserInfo(UserInfo
-            .newBuilder()
-            .setUserName("user")
-            .setHostName("localhost")
-            .setRemoteAddress("127.0.0.1"))
+    when(handler.handleWriteRequest(eq(request), any(), eq(doubleBuffer)))
+        .thenReturn(clientResponse);
+
+    OMResponse result = sm.runCommand(request, ti);
+
+    assertNotNull(result);
+    assertEquals(Status.OK, result.getStatus());
+    assertFalse(result.hasOmLockDetails());
+  }
+
+  @Test
+  public void testRunCommandIOException() throws Exception {
+    OMRequest request = sampleWriteRequest();
+    TermIndex ti = TermIndex.valueOf(1, 5);
+
+    when(handler.handleWriteRequest(eq(request), any(), eq(doubleBuffer)))
+        .thenThrow(new IOException("disk full"));
+
+    OMResponse result = sm.runCommand(request, ti);
+
+    assertNotNull(result);
+    assertFalse(result.getSuccess());
+    verify(doubleBuffer).add(any(), eq(ti));
+  }
+
+  @Test
+  public void testRunCommandRuntimeExceptionTerminates() throws Exception {
+    OMRequest request = sampleWriteRequest();
+    TermIndex ti = TermIndex.valueOf(1, 5);
+
+    when(handler.handleWriteRequest(eq(request), any(), eq(doubleBuffer)))
+        .thenThrow(new NullPointerException("boom"));
+
+    // With ExitUtils.disableSystemExit(), terminate throws ExitException
+    // instead of calling System.exit(). The catch block in runCommand
+    // calls ExitUtils.terminate which throws ExitException.
+    assertThrows(ExitUtils.ExitException.class,
+        () -> sm.runCommand(request, ti));
+  }
+
+  // --- processResponse tests ---
+
+  @Test
+  public void testProcessResponseSuccess() {
+    OMResponse response = OMResponse.newBuilder()
+        .setCmdType(Type.CreateKey)
+        .setStatus(Status.OK)
+        .setSuccess(true)
         .build();
 
-    submittedTrx = mockTransactionContext(prepareRequest);
-    returnedTrx = ozoneManagerStateMachine.preAppendTransaction(submittedTrx);
-    assertSame(submittedTrx, returnedTrx);
+    Message result = sm.processResponse(response);
+    assertNotNull(result);
+  }
 
-    // Prepare should be started.
-    assertEquals(PrepareStatus.PREPARE_GATE_ENABLED,
-        prepareState.getState().getStatus());
+  @Test
+  public void testProcessResponseNonCriticalError() {
+    OMResponse response = OMResponse.newBuilder()
+        .setCmdType(Type.CreateKey)
+        .setStatus(Status.KEY_NOT_FOUND)
+        .setSuccess(false)
+        .setMessage("key not found")
+        .build();
 
-    // Submitting a write request should now fail.
-    StateMachineException smEx =
-        assertThrows(StateMachineException.class,
-            () -> 
ozoneManagerStateMachine.preAppendTransaction(mockTransactionContext(createKeyRequest)),
-            "Expected StateMachineException to be thrown when submitting write 
request while prepared.");
-    assertFalse(smEx.leaderShouldStepDown());
+    Message result = sm.processResponse(response);
+    assertNotNull(result);
+  }
 
-    Throwable cause = smEx.getCause();
-    OMException omException = assertInstanceOf(OMException.class, cause);
-    assertEquals(omException.getResult(), 
OMException.ResultCodes.NOT_SUPPORTED_OPERATION_WHEN_PREPARED);
+  @Test
+  public void testProcessResponseInternalErrorTerminates() {
+    OMResponse response = OMResponse.newBuilder()
+        .setCmdType(Type.CreateKey)
+        .setStatus(Status.INTERNAL_ERROR)
+        .setSuccess(false)
+        .setMessage("internal error")
+        .build();
 
-    // Should be able to prepare again without issue.
-    submittedTrx = mockTransactionContext(prepareRequest);
-    returnedTrx = ozoneManagerStateMachine.preAppendTransaction(submittedTrx);
-    assertSame(submittedTrx, returnedTrx);
+    // terminate throws ExitException when system exit is disabled
+    assertThrows(ExitUtils.ExitException.class,
+        () -> sm.processResponse(response));
+  }
 
-    assertEquals(PrepareStatus.PREPARE_GATE_ENABLED, 
prepareState.getState().getStatus());
+  @Test
+  public void testProcessResponseMetadataErrorTerminates() {
+    OMResponse response = OMResponse.newBuilder()
+        .setCmdType(Type.CreateKey)
+        .setStatus(Status.METADATA_ERROR)
+        .setSuccess(false)
+        .setMessage("metadata error")
+        .build();
 
-    // Cancel prepare is handled in the cancel request apply txn step, not
-    // the pre-append state machine step, so it is tested in other classes.
+    assertThrows(ExitUtils.ExitException.class,
+        () -> sm.processResponse(response));
   }
 
+  // --- createErrorResponse tests ---
+
   @Test
-  public void testApplyTransactionExceptionAuditLog() throws Exception {
-    ExitUtils.disableSystemExit();
-    // submit a create volume request having null pointer exception
-    OzoneManagerProtocolProtos.VolumeInfo volInfo = 
OzoneManagerProtocolProtos.VolumeInfo.newBuilder()
-        .setAdminName("a").setOwnerName("a").setVolume("a").build();
-    OMRequest createVolRequest = OMRequest.newBuilder()
-        
.setCreateVolumeRequest(OzoneManagerProtocolProtos.CreateVolumeRequest.newBuilder().setVolumeInfo(volInfo))
-        .setCmdType(Type.CreateVolume).setClientId("123")
-        
.setUserInfo(UserInfo.newBuilder().setUserName("user").setHostName("localhost").setRemoteAddress("127.0.0.1"))
+  public void testCreateErrorResponse() {
+    OMRequest request = sampleWriteRequest();
+    IOException ex = new IOException("test error");
+    TermIndex ti = TermIndex.valueOf(1, 5);
+
+    OMResponse response = sm.createErrorResponse(request, ex, ti);
+
+    assertNotNull(response);
+    assertFalse(response.getSuccess());
+    assertEquals(Type.CreateKey, response.getCmdType());
+    assertEquals("test error", response.getMessage());
+    verify(doubleBuffer).add(any(), eq(ti));
+  }
+
+  @Test
+  public void testCreateErrorResponseNullMessage() {
+    OMRequest request = sampleWriteRequest();
+    IOException ex = new IOException((String) null);
+    TermIndex ti = TermIndex.valueOf(1, 5);
+
+    OMResponse response = sm.createErrorResponse(request, ex, ti);
+
+    assertNotNull(response);
+    assertFalse(response.getSuccess());
+    assertFalse(response.hasMessage());
+  }
+
+  // --- query tests ---
+
+  @Test
+  public void testQueryHappyPath() throws Exception {
+    OMRequest request = sampleReadRequest();
+    OMResponse expectedResponse = OMResponse.newBuilder()
+        .setCmdType(Type.ServiceList)
+        .setStatus(Status.OK)
+        .setSuccess(true)
+        .build();
+
+    when(handler.handleReadRequest(any(OMRequest.class)))
+        .thenReturn(expectedResponse);
+
+    Message queryMessage = Message.valueOf(
+        OMRatisHelper.convertRequestToByteString(request));
+    CompletableFuture<Message> future = sm.query(queryMessage);
+    Message result = future.get();
+
+    assertNotNull(result);
+    verify(handler).handleReadRequest(any(OMRequest.class));
+  }
+
+  @Test
+  public void testQueryIOException() {
+    // Garbage bytes that can't be parsed as OMRequest
+    Message badMessage = Message.valueOf("not-a-valid-protobuf");
+
+    CompletableFuture<Message> future = sm.query(badMessage);
+
+    ExecutionException ex = assertThrows(ExecutionException.class, 
future::get);
+    assertInstanceOf(IOException.class, ex.getCause());
+  }
+
+  // --- notifyTermIndexUpdated tests ---
+
+  @Test
+  public void testNotifyTermIndexSameIndexOk() {
+    sm.notifyTermIndexUpdated(1, 5);
+    // Same index should not throw
+    sm.notifyTermIndexUpdated(1, 5);
+
+    assertTermIndex(1, 5, sm.getLastNotifiedTermIndex());
+  }
+
+  @Test
+  public void testNotifyTermIndexDecreasingThrows() {
+    sm.notifyTermIndexUpdated(1, 5);
+
+    assertThrows(IllegalArgumentException.class,
+        () -> sm.notifyTermIndexUpdated(1, 4));
+  }
+
+  // --- updateLastAppliedTermIndex tests ---
+
+  @Test
+  public void testUpdateLastAppliedInSkipRange() {
+    sm.notifyTermIndexUpdated(0, 0);
+    // Skip index 1 (from applyTransaction), notify 2 and 3
+    sm.notifyTermIndexUpdated(0, 2);
+    sm.notifyTermIndexUpdated(0, 3);
+
+    assertTermIndex(0, 0, sm.getLastAppliedTermIndex());
+
+    // Update with index 1 which is in skip range [1, 3]
+    sm.updateLastAppliedTermIndex(TermIndex.valueOf(0, 1));
+
+    // Should jump to lastNotified (3)
+    assertTermIndex(0, 3, sm.getLastAppliedTermIndex());
+  }
+
+  @Test
+  public void testUpdateLastAppliedBelowSkipRange() {
+    sm.notifyTermIndexUpdated(0, 0);
+    // Notify consecutive: no skip
+    sm.notifyTermIndexUpdated(0, 1);
+
+    assertTermIndex(0, 1, sm.getLastAppliedTermIndex());
+
+    // Direct update
+    sm.updateLastAppliedTermIndex(TermIndex.valueOf(0, 2));
+    assertTermIndex(0, 2, sm.getLastAppliedTermIndex());
+  }
+
+  @Test
+  public void testLastAppliedIndexSequence() {
+    sm.notifyTermIndexUpdated(0, 0);
+    assertTermIndex(0, 0, sm.getLastAppliedTermIndex());
+    assertTermIndex(0, 0, sm.getLastNotifiedTermIndex());
+
+    // Consecutive notification advances both
+    sm.notifyTermIndexUpdated(0, 1);
+    assertTermIndex(0, 1, sm.getLastAppliedTermIndex());
+    assertTermIndex(0, 1, sm.getLastNotifiedTermIndex());
+
+    // Direct updates advance lastApplied but NOT lastNotified
+    sm.updateLastAppliedTermIndex(TermIndex.valueOf(0, 2));
+    sm.updateLastAppliedTermIndex(TermIndex.valueOf(0, 3));
+    assertTermIndex(0, 3, sm.getLastAppliedTermIndex());
+    assertTermIndex(0, 1, sm.getLastNotifiedTermIndex());
+
+    // Term change via notification advances both
+    sm.notifyTermIndexUpdated(1L, 4L);
+    assertTermIndex(1, 4, sm.getLastAppliedTermIndex());
+    assertTermIndex(1, 4, sm.getLastNotifiedTermIndex());
+
+    // More direct updates
+    sm.updateLastAppliedTermIndex(TermIndex.valueOf(1L, 5L));
+    sm.updateLastAppliedTermIndex(TermIndex.valueOf(1L, 6L));
+    assertTermIndex(1, 6, sm.getLastAppliedTermIndex());
+    assertTermIndex(1, 4, sm.getLastNotifiedTermIndex());
+  }
+
+  // --- takeSnapshot tests ---
+
+  @Test
+  public void testTakeSnapshotAlreadyCaughtUp() throws Exception {
+    // Notify incrementally so lastApplied advances via the
+    // "notified - applied == 1" fast path in notifyTermIndexUpdated.
+    // A single jump from -1 to 1 would set lastSkipped=0 without
+    // updating lastApplied, causing takeSnapshot to loop forever.
+    sm.notifyTermIndexUpdated(1, 0);
+    sm.notifyTermIndexUpdated(1, 1);
+    // lastApplied=1, lastSkipped=-1 → already caught up
+    assertTermIndex(1, 1, sm.getLastAppliedTermIndex());
+
+    OMMetadataManager metaMgr = mock(OMMetadataManager.class);
+    @SuppressWarnings("unchecked")
+    Table<String, TransactionInfo> txnTable = mock(Table.class);
+    DBStore store = mock(DBStore.class);
+    when(om.getMetadataManager()).thenReturn(metaMgr);
+    when(metaMgr.getTransactionInfoTable()).thenReturn(txnTable);
+    when(metaMgr.getStore()).thenReturn(store);
+
+    long snapshotIndex = sm.takeSnapshot();
+
+    assertEquals(1, snapshotIndex);
+    verify(store).flushDB();
+    verify(om).setTransactionInfo(any(TransactionInfo.class));
+  }
+
+  @Test
+  public void testTakeSnapshotWaitsForFlush() throws Exception {
+    // Create a skip gap: notify 0, then skip to 3
+    sm.notifyTermIndexUpdated(1, 0);
+    sm.notifyTermIndexUpdated(1, 2);
+    sm.notifyTermIndexUpdated(1, 3);
+    // lastApplied=0, lastSkipped=1, so takeSnapshot will loop
+
+    when(om.isStopped()).thenReturn(false);
+    // On first awaitFlush, simulate the double buffer catching up
+    doAnswer(invocation -> {
+      // Simulate applyTransaction completing index 1
+      sm.updateLastAppliedTermIndex(TermIndex.valueOf(1, 1));
+      // This should jump to lastNotified=3 due to skip range
+      return null;
+    }).when(doubleBuffer).awaitFlush();
+
+    OMMetadataManager metaMgr = mock(OMMetadataManager.class);
+    @SuppressWarnings("unchecked")
+    Table<String, TransactionInfo> txnTable = mock(Table.class);
+    DBStore store = mock(DBStore.class);
+    when(om.getMetadataManager()).thenReturn(metaMgr);
+    when(metaMgr.getTransactionInfoTable()).thenReturn(txnTable);
+    when(metaMgr.getStore()).thenReturn(store);
+
+    long snapshotIndex = sm.takeSnapshot();
+
+    assertEquals(3, snapshotIndex);
+    verify(doubleBuffer).awaitFlush();
+    verify(store).flushDB();
+  }
+
+  @Test
+  public void testTakeSnapshotOmStopped() {
+    when(om.isStopped()).thenReturn(true);
+
+    // Set lastSkippedIndex > lastApplied to enter the wait loop
+    sm.notifyTermIndexUpdated(0, 0);
+    sm.notifyTermIndexUpdated(0, 2);  // skips index 1
+
+    assertThrows(IOException.class, () -> sm.takeSnapshot());
+  }
+
+  // --- loadSnapshotInfoFromDB tests ---
+
+  @Test
+  public void testLoadSnapshotInfoFromDBExists(@TempDir Path tmpDir) throws 
Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OMConfigKeys.OZONE_OM_DB_DIRS, 
tmpDir.toAbsolutePath().toString());
+    OzoneManager realishOm = mock(OzoneManager.class);
+    when(realishOm.getConfiguration()).thenReturn(conf);
+    when(realishOm.getConfig()).thenReturn(conf.getObject(OmConfig.class));
+    OMMetadataManager realMetaMgr = new OmMetadataManagerImpl(conf, realishOm);
+    when(realishOm.getMetadataManager()).thenReturn(realMetaMgr);
+    
when(realishOm.getTransactionInfo()).thenReturn(mock(TransactionInfo.class));
+
+    // Write a TransactionInfo to DB
+    TransactionInfo txnInfo = TransactionInfo.valueOf(TermIndex.valueOf(3, 
42));
+    
realMetaMgr.getTransactionInfoTable().put(OzoneConsts.TRANSACTION_INFO_KEY, 
txnInfo);
+
+    OzoneManagerStateMachine testSm =
+        new OzoneManagerStateMachine(realishOm, doubleBuffer, handler, 
executor, null);
+    testSm.loadSnapshotInfoFromDB();
+
+    assertTermIndex(3, 42, testSm.getLastAppliedTermIndex());
+    verify(realishOm).setTransactionInfo(any(TransactionInfo.class));
+  }
+
+  @Test
+  public void testLoadSnapshotInfoFromDBMissing(@TempDir Path tmpDir) throws 
Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OMConfigKeys.OZONE_OM_DB_DIRS, 
tmpDir.toAbsolutePath().toString());
+    OzoneManager realishOm = mock(OzoneManager.class);
+    when(realishOm.getConfiguration()).thenReturn(conf);
+    when(realishOm.getConfig()).thenReturn(conf.getObject(OmConfig.class));
+    OMMetadataManager realMetaMgr = new OmMetadataManagerImpl(conf, realishOm);
+    when(realishOm.getMetadataManager()).thenReturn(realMetaMgr);
+
+    OzoneManagerStateMachine testSm =
+        new OzoneManagerStateMachine(realishOm, doubleBuffer, handler, 
executor, null);
+    // Should not throw when TransactionInfo is not found
+    testSm.loadSnapshotInfoFromDB();
+
+    // lastApplied should remain at default (0, -1)
+    assertTermIndex(0, -1, testSm.getLastAppliedTermIndex());
+  }
+
+  // --- notifyLeaderChanged tests ---
+
+  @Test
+  public void testNotifyLeaderChangedSameNode() {
+    RaftPeerId peerId = RaftPeerId.valueOf("om1");
+    RaftGroupId groupId = RaftGroupId.randomId();
+    RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(peerId, groupId);
+
+    when(om.getConfiguration()).thenReturn(new OzoneConfiguration());
+    when(om.buildAuditMessageForSuccess(any(), any()))
+        .thenReturn(dummyAuditMessage());
+
+    sm.notifyLeaderChanged(memberId, peerId);
+
+    // Same node becomes leader → should warm up EDEK cache
+    verify(om).initializeEdekCache(any());
+    verify(om).omHAMetricsInit("om1");
+  }
+
+  @Test
+  public void testNotifyLeaderChangedDifferentNode() {
+    RaftPeerId localPeerId = RaftPeerId.valueOf("om1");
+    RaftPeerId leaderPeerId = RaftPeerId.valueOf("om2");
+    RaftGroupId groupId = RaftGroupId.randomId();
+    RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(localPeerId, 
groupId);
+
+    when(om.buildAuditMessageForSuccess(any(), any()))
+        .thenReturn(dummyAuditMessage());
+
+    sm.notifyLeaderChanged(memberId, leaderPeerId);
+
+    // Different node is leader → should NOT warm up cache
+    verify(om, never()).initializeEdekCache(any());
+    verify(om).omHAMetricsInit("om2");
+  }
+
+  @Test
+  public void testNotifyLeaderChangedAuditParams() {
+    when(om.buildAuditMessageForSuccess(any(), any()))
+        .thenReturn(dummyAuditMessage());
+
+    RaftGroupId groupId = RaftGroupId.randomId();
+
+    // First leader change: previousLeader should be "NONE"
+    RaftPeerId leader1 = RaftPeerId.valueOf("om1");
+    sm.notifyLeaderChanged(
+        RaftGroupMemberId.valueOf(RaftPeerId.valueOf("om2"), groupId),
+        leader1);
+
+    @SuppressWarnings("unchecked")
+    ArgumentCaptor<Map<String, String>> captor =
+        ArgumentCaptor.forClass(Map.class);
+    verify(om).buildAuditMessageForSuccess(
+        eq(OMSystemAction.LEADER_CHANGE), captor.capture());
+    Map<String, String> params = captor.getValue();
+    assertEquals("NONE", params.get("previousLeader"));
+    assertEquals("om1", params.get("newLeader"));
+
+    // Second leader change: previousLeader should be "om1"
+    RaftPeerId leader2 = RaftPeerId.valueOf("om3");
+    sm.notifyLeaderChanged(
+        RaftGroupMemberId.valueOf(RaftPeerId.valueOf("om2"), groupId),
+        leader2);
+
+    verify(om, times(2)).buildAuditMessageForSuccess(
+        eq(OMSystemAction.LEADER_CHANGE), captor.capture());
+    Map<String, String> params2 = captor.getValue();
+    assertEquals("om1", params2.get("previousLeader"));
+    assertEquals("om3", params2.get("newLeader"));
+  }
+
+  // --- notifyConfigurationChanged tests ---
+
+  @Test
+  public void testNotifyConfigurationChanged() {
+    RaftProtos.RaftPeerProto peer1 = RaftProtos.RaftPeerProto.newBuilder()
+        .setId(ByteString.copyFromUtf8("om1"))
+        .setAddress("host1:9862")
+        .build();
+    RaftProtos.RaftPeerProto peer2 = RaftProtos.RaftPeerProto.newBuilder()
+        .setId(ByteString.copyFromUtf8("om2"))
+        .setAddress("host2:9862")
+        .build();
+    RaftProtos.RaftPeerProto listener = RaftProtos.RaftPeerProto.newBuilder()
+        .setId(ByteString.copyFromUtf8("om3"))
+        .setAddress("host3:9862")
         .build();
-    TransactionContext submittedTrx = mockTransactionContext(createVolRequest);
-    Mockito.doAnswer((i) -> {
-      if (!((AuditMessage) 
i.getArgument(0)).getFormattedMessage().contains("Transaction=10") ||
-          !((AuditMessage) 
i.getArgument(0)).getFormattedMessage().contains("Command=CreateVolume")) {
-        Assertions.fail("transaction and command not found");
-      }
-      // throw another exception to change to new exception to avoid terminate 
call
-      throw new OMException("test", 
OMException.ResultCodes.VOLUME_IS_REFERENCED);
-    }).when(auditLogger).logWrite(any());
-    CompletableFuture<Message> messageCompletableFuture = 
ozoneManagerStateMachine.applyTransaction(submittedTrx);
-    try {
-      messageCompletableFuture.get();
-    } catch (Exception ex) {
-      // do nothing
-    }
-  }
-
-  private TransactionContext mockTransactionContext(OMRequest request) {
+    RaftProtos.RaftConfigurationProto config =
+        RaftProtos.RaftConfigurationProto.newBuilder()
+            .addPeers(peer1)
+            .addPeers(peer2)
+            .addListeners(listener)
+            .build();
+
+    sm.notifyConfigurationChanged(1, 5, config);
+
+    @SuppressWarnings("unchecked")
+    ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
+    verify(om).updatePeerList(captor.capture());
+    List<String> peerIds = captor.getValue();
+    assertEquals(3, peerIds.size());
+    assertTrue(peerIds.contains("om1"));
+    assertTrue(peerIds.contains("om2"));
+    assertTrue(peerIds.contains("om3"));
+  }
+
+  // --- notifySnapshotInstalled tests ---
+
+  @Test
+  public void testNotifySnapshotInstalledSuccess() {
+    RaftPeer localPeer = RaftPeer.newBuilder()
+        .setId("om1").setAddress("localhost:9862").build();
+
+    OzoneManagerRatisServer ratisServer = mock(OzoneManagerRatisServer.class);
+    RaftServer.Division division = mock(RaftServer.Division.class);
+    when(division.getPeer()).thenReturn(localPeer);
+    when(ratisServer.getServerDivision()).thenReturn(division);
+    when(om.getOmRatisServer()).thenReturn(ratisServer);
+
+    OmRatisSnapshotProvider snapshotProvider = 
mock(OmRatisSnapshotProvider.class);
+    when(om.getOmSnapshotProvider()).thenReturn(snapshotProvider);
+
+    sm.notifySnapshotInstalled(
+        RaftProtos.InstallSnapshotResult.SUCCESS, 100, localPeer);
+
+    verify(snapshotProvider).init();
+  }
+
+  @Test
+  public void testNotifySnapshotInstalledDifferentPeer() {
+    RaftPeer localPeer = RaftPeer.newBuilder()
+        .setId("om1").setAddress("localhost:9862").build();
+    RaftPeer remotePeer = RaftPeer.newBuilder()
+        .setId("om2").setAddress("remote:9862").build();
+
+    OzoneManagerRatisServer ratisServer = mock(OzoneManagerRatisServer.class);
+    RaftServer.Division division = mock(RaftServer.Division.class);
+    when(division.getPeer()).thenReturn(localPeer);
+    when(ratisServer.getServerDivision()).thenReturn(division);
+    when(om.getOmRatisServer()).thenReturn(ratisServer);
+
+    OmRatisSnapshotProvider snapshotProvider = 
mock(OmRatisSnapshotProvider.class);
+    when(om.getOmSnapshotProvider()).thenReturn(snapshotProvider);
+
+    sm.notifySnapshotInstalled(
+        RaftProtos.InstallSnapshotResult.SUCCESS, 100, remotePeer);
+
+    // Different peer, init() should NOT be called
+    verify(snapshotProvider, never()).init();
+  }
+
+  // --- notifyLeaderReady tests ---
+
+  @Test
+  public void testNotifyLeaderReady() {
+    OmSnapshotManager snapshotManager = mock(OmSnapshotManager.class);
+    when(om.getOmSnapshotManager()).thenReturn(snapshotManager);
+
+    sm.notifyLeaderReady();
+
+    verify(snapshotManager).resetInFlightSnapshotCount();
+  }
+
+  // --- getLatestSnapshot tests ---
+
+  @Test
+  public void testGetLatestSnapshot() {
+    TransactionInfo txnInfo = TransactionInfo.valueOf(TermIndex.valueOf(2, 
10));
+    when(om.getTransactionInfo()).thenReturn(txnInfo);
+
+    SnapshotInfo snapshot = sm.getLatestSnapshot();
+
+    assertNotNull(snapshot);
+    assertEquals(2, snapshot.getTermIndex().getTerm());
+    assertEquals(10, snapshot.getTermIndex().getIndex());
+  }
+
+  // --- pause / stop / close lifecycle tests ---
+
+  @Test
+  public void testPauseStopsDoubleBuffer() {
+    sm.pause();
+
+    verify(doubleBuffer).stop();
+  }
+
+  @Test
+  public void testPauseAlreadyPaused() {
+    // First pause
+    sm.pause();
+    // Second pause should not throw
+    sm.pause();
+
+    verify(doubleBuffer, times(2)).stop();
+  }
+
+  @Test
+  public void testStopShutdownsResources() {
+    sm.stop();
+
+    verify(doubleBuffer).stop();
+    assertTrue(executor.isShutdown());
+  }
+
+  @Test
+  public void testCloseWhenOmRunning() {
+    when(om.isStopped()).thenReturn(false);
+
+    sm.close();
+
+    verify(om).shutDown(anyString());
+  }
+
+  @Test
+  public void testCloseWhenOmAlreadyStopped() {
+    when(om.isStopped()).thenReturn(true);
+
+    sm.close();
+
+    // stop() calls doubleBuffer.stop()
+    verify(doubleBuffer).stop();
+  }
+
+  // --- other utility method tests ---
+
+  @Test
+  public void testToStateMachineLogEntryString() {
+    OMRequest request = sampleWriteRequest();
+    RaftProtos.StateMachineLogEntryProto smEntry =
+        RaftProtos.StateMachineLogEntryProto.newBuilder()
+            .setLogData(OMRatisHelper.convertRequestToByteString(request))
+            .build();
+
+    String result = sm.toStateMachineLogEntryString(smEntry);
+
+    assertNotNull(result);
+    assertTrue(result.contains("CreateKey") || result.contains("create_key"),
+        "Expected result to contain command type, got: " + result);
+  }
+
+  @Test
+  public void testAwaitDoubleBufferFlush() throws Exception {
+    sm.awaitDoubleBufferFlush();
+
+    verify(doubleBuffer).awaitFlush();
+  }
+
+  // --- private helpers ---
+
+  private static void assertTermIndex(long expectedTerm, long expectedIndex, 
TermIndex computed) {
+    assertEquals(expectedTerm, computed.getTerm());
+    assertEquals(expectedIndex, computed.getIndex());
+  }
+
+  private OMRequest sampleWriteRequest() {
+    return OMRequest.newBuilder()
+        .setCmdType(Type.CreateKey)
+        .setClientId("test-client")
+        .setCreateKeyRequest(CreateKeyRequest.newBuilder()
+            .setKeyArgs(KeyArgs.newBuilder()
+                .setVolumeName("vol")
+                .setBucketName("bucket")
+                .setKeyName("key")))
+        .setUserInfo(UserInfo.newBuilder()
+            .setUserName("user")
+            .setHostName("localhost")
+            .setRemoteAddress("127.0.0.1"))
+        .build();
+  }
+
+  private OMRequest sampleReadRequest() {
+    return OMRequest.newBuilder()
+        .setCmdType(Type.ServiceList)
+        .setClientId("test-client")
+        .build();
+  }
+
+  private TransactionContext mockTrx(OMRequest request, long term, long index) 
{
+    return mockTrx(request, term, index, request);
+  }
+
+  private TransactionContext mockTrx(OMRequest request, long term, long index, 
Object context) {
     RaftProtos.StateMachineLogEntryProto logEntry =
         RaftProtos.StateMachineLogEntryProto.newBuilder()
             .setLogData(OMRatisHelper.convertRequestToByteString(request))
             .build();
+    TransactionContext trx = mock(TransactionContext.class);
+    when(trx.getStateMachineLogEntry()).thenReturn(logEntry);
+    when(trx.getStateMachineContext()).thenReturn(context);
+    RaftProtos.LogEntryProto logEntryProto = 
LogProtoUtils.toLogEntryProto(term, index, index);
+    when(trx.getLogEntry()).thenReturn(logEntryProto);
+    return trx;
+  }
 
-    TransactionContext mockTrx = mock(TransactionContext.class);
-    when(mockTrx.getStateMachineLogEntry()).thenReturn(logEntry);
-    when(mockTrx.getStateMachineContext()).thenReturn(request);
-    RaftProtos.LogEntryProto logEntryProto = LogProtoUtils.toLogEntryProto(10, 
10, 10);
-    when(mockTrx.getLogEntry()).thenReturn(logEntryProto);
+  private AuditMessage dummyAuditMessage() {
+    return new AuditMessage.Builder()
+        .forOperation(() -> "LEADER_CHANGE")
+        .withResult(AuditEventStatus.SUCCESS)
+        .build();
+  }
+
+  private RaftGroupId initializeStateMachine() throws IOException {
+    RaftGroupId groupId = RaftGroupId.randomId();
+    RaftServer server = mock(RaftServer.class);
+    RaftStorage storage = mock(RaftStorage.class);
+    sm.initialize(server, groupId, storage);
+    return groupId;
+  }
 
-    return mockTrx;
+  private RaftClientRequest buildClientRequest(
+      RaftGroupId groupId, OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(ClientId.randomId())
+        .setServerId(RaftPeerId.valueOf("om1"))
+        .setGroupId(groupId)
+        .setCallId(1L)
+        .setMessage(Message.valueOf(
+            OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/protocolPB/TestOzoneManagerRequestHandler.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/protocolPB/TestOzoneManagerRequestHandler.java
index a35551e246c..84357f7db35 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/protocolPB/TestOzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/protocolPB/TestOzoneManagerRequestHandler.java
@@ -29,8 +29,13 @@
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
 import org.apache.hadoop.ozone.om.OmConfig;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
 import org.apache.hadoop.ozone.om.helpers.BasicOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.ListKeysLightResult;
 import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
@@ -40,10 +45,12 @@
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.util.Time;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 /**
@@ -242,4 +249,66 @@ public void testListKeysLightEncryptionFromOmKeyInfo() 
throws IOException {
     Assertions.assertTrue(basicKeyInfoList.get(0).getIsEncrypted(), 
"encrypted-key should have isEncrypted=true");
     Assertions.assertFalse(basicKeyInfoList.get(1).getIsEncrypted(), 
"normal-key should have isEncrypted=false");
   }
+
+  /**
+   * Verify that when handleWriteRequestImpl encounters an exception during
+   * validateAndUpdateCache, the audit logger is called with the correct
+   * Transaction index and Command type in the audit message.
+   */
+  @Test
+  public void testWriteRequestExceptionAuditLog() {
+    OzoneManagerRequestHandler handler = getRequestHandler(10);
+    OzoneManager ozoneManager = handler.getOzoneManager();
+
+    // Set up audit logger mock
+    AuditLogger auditLogger = Mockito.mock(AuditLogger.class);
+    Mockito.when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+
+    // Set up perf metrics (needed by captureLatencyNs)
+    OMPerformanceMetrics perfMetrics = 
Mockito.mock(OMPerformanceMetrics.class);
+    Mockito.when(perfMetrics.getValidateAndUpdateCacheLatencyNs())
+        .thenReturn(Mockito.mock(MutableRate.class));
+    Mockito.when(ozoneManager.getPerfMetrics()).thenReturn(perfMetrics);
+
+    // Disable ACLs so preExecute doesn't need ACL setup
+    Mockito.when(ozoneManager.getAclsEnabled()).thenReturn(false);
+
+    // Leave getMetrics() returning null -> NPE in validateAndUpdateCache
+    // when OMVolumeCreateRequest calls 
ozoneManager.getMetrics().incNumVolumeCreates()
+
+    // Build a CreateVolume OMRequest
+    OzoneManagerProtocolProtos.VolumeInfo volInfo =
+        OzoneManagerProtocolProtos.VolumeInfo.newBuilder()
+            .setAdminName("admin").setOwnerName("owner")
+            .setVolume("testVol").build();
+    OzoneManagerProtocolProtos.OMRequest omRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+            .setCreateVolumeRequest(
+                OzoneManagerProtocolProtos.CreateVolumeRequest.newBuilder()
+                    .setVolumeInfo(volInfo))
+            .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
+            .setClientId("test-client")
+            .setUserInfo(OzoneManagerProtocolProtos.UserInfo.newBuilder()
+                .setUserName("testUser").setHostName("localhost")
+                .setRemoteAddress("127.0.0.1"))
+            .build();
+
+    ExecutionContext context = ExecutionContext.of(10, TermIndex.valueOf(1, 
10));
+
+    // handleWriteRequestImpl should throw (NPE from null getMetrics())
+    // but the catch block logs the audit message first
+    Assertions.assertThrows(NullPointerException.class,
+        () -> handler.handleWriteRequestImpl(omRequest, context));
+
+    // Verify auditLogger.logWrite was called with correct content
+    ArgumentCaptor<AuditMessage> captor =
+        ArgumentCaptor.forClass(AuditMessage.class);
+    Mockito.verify(auditLogger).logWrite(captor.capture());
+
+    String formatted = captor.getValue().getFormattedMessage();
+    Assertions.assertTrue(formatted.contains("Transaction") && 
formatted.contains("10"),
+        "Audit message should contain Transaction 10, got: " + formatted);
+    Assertions.assertTrue(formatted.contains("Command") && 
formatted.contains("CreateVolume"),
+        "Audit message should contain Command CreateVolume, got: " + 
formatted);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to