This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d7f64221c3 HDDS-9876. OM state machine should add response for every 
write request to the double-buffer (#5749)
d7f64221c3 is described below

commit d7f64221c335d7316c53594e6e153568f7a1e595
Author: Sammi Chen <[email protected]>
AuthorDate: Wed Dec 27 02:44:29 2023 +0800

    HDDS-9876. OM state machine should add response for every write request to 
the double-buffer (#5749)
---
 .../client/rpc/TestOzoneRpcClientAbstract.java     |   7 ++
 .../client/rpc/TestOzoneRpcClientWithRatis.java    | 118 +++++++++++++++++++++
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |  48 +++++++--
 .../metrics/OzoneManagerStateMachineMetrics.java   |  98 +++++++++++++++++
 .../ozone/om/response/DummyOMClientResponse.java   |  44 ++++++++
 .../protocolPB/OzoneManagerRequestHandler.java     |  25 +++++
 .../ozone/om/response/TestCleanupTableInfo.java    |   1 +
 7 files changed, 335 insertions(+), 6 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index bd0e7af50f..8f1315c27c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -270,6 +270,13 @@ public abstract class TestOzoneRpcClientAbstract {
     TestOzoneRpcClientAbstract.clusterId = clusterId;
   }
 
+  public static OzoneClient getClient() {
+    return TestOzoneRpcClientAbstract.ozClient;
+  }
+
+  public static MiniOzoneCluster getCluster() {
+    return TestOzoneRpcClientAbstract.cluster;
+  }
   /**
    * Test OM Proxy Provider.
    */
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index a8343c7512..19e111d65d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -26,13 +26,16 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -49,17 +52,22 @@ import 
org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
+import 
org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerStateMachineMetrics;
 import org.apache.ozone.test.GenericTestUtils;
+import org.junit.Assert;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * This class is to test all the public facing APIs of Ozone Client with an
@@ -284,4 +292,114 @@ public class TestOzoneRpcClientWithRatis extends 
TestOzoneRpcClientAbstract {
     }
     assertArrayEquals(data, buffer);
   }
+
+  @Test
+  public void testParallelDeleteBucketAndCreateKey() throws IOException,
+      InterruptedException, TimeoutException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String value = "sample value";
+    getStore().createVolume(volumeName);
+    OzoneVolume volume = getStore().getVolume(volumeName);
+    volume.createBucket(bucketName);
+    String keyName = UUID.randomUUID().toString();
+
+    GenericTestUtils.LogCapturer omSMLog = GenericTestUtils.LogCapturer
+        .captureLogs(OzoneManagerStateMachine.LOG);
+    OzoneManagerStateMachine omSM = getCluster().getOzoneManager()
+        .getOmRatisServer().getOmStateMachine();
+    OzoneManagerStateMachineMetrics metrics = omSM.getMetrics();
+
+    Thread thread1 = new Thread(() -> {
+      try {
+        volume.deleteBucket(bucketName);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    Thread thread2 = new Thread(() -> {
+      try {
+        getClient().getProxy().createKey(volumeName, bucketName, keyName,
+            0, ReplicationType.RATIS, ONE, new HashMap<>());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    OMRequestHandlerPauseInjector injector =
+        new OMRequestHandlerPauseInjector();
+    omSM.getHandler().setInjector(injector);
+    thread1.start();
+    thread2.start();
+    GenericTestUtils.waitFor(() -> metrics.getApplyTransactionMapSize() > 0,
+        100, 5000);
+    Thread.sleep(2000);
+    injector.resume();
+
+    try {
+      thread1.join();
+      thread2.join();
+    } catch (InterruptedException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    omSM.getHandler().setInjector(null);
+    // Generate more write requests to OM
+    String newBucketName = UUID.randomUUID().toString();
+    volume.createBucket(newBucketName);
+    OzoneBucket bucket = volume.getBucket(newBucketName);
+    for (int i = 0; i < 10; i++) {
+      bucket.createKey("key-" + i, value.getBytes(UTF_8).length,
+          ReplicationType.RATIS, ONE, new HashMap<>());
+    }
+
+    Assert.assertTrue(
+        omSMLog.getOutput().contains("Failed to write, Exception occurred"));
+    GenericTestUtils.waitFor(() -> metrics.getApplyTransactionMapSize() == 0,
+        100, 5000);
+  }
+
+  private static class OMRequestHandlerPauseInjector extends FaultInjector {
+    private CountDownLatch ready;
+    private CountDownLatch wait;
+
+    OMRequestHandlerPauseInjector() {
+      init();
+    }
+
+    @Override
+    public void init() {
+      this.ready = new CountDownLatch(1);
+      this.wait = new CountDownLatch(1);
+    }
+
+    @Override
+    public void pause() throws IOException {
+      ready.countDown();
+      try {
+        wait.await();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public void resume() throws IOException {
+      // Make sure injector pauses before resuming.
+      try {
+        ready.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        assertTrue(fail("resume interrupted"));
+      }
+      wait.countDown();
+    }
+
+    @Override
+    public void reset() throws IOException {
+      init();
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 2b2ef704b2..5a9faa301e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -40,8 +41,10 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import 
org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerStateMachineMetrics;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.om.response.DummyOMClientResponse;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -85,7 +88,7 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
  */
 public class OzoneManagerStateMachine extends BaseStateMachine {
 
-  static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(OzoneManagerStateMachine.class);
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
@@ -109,6 +112,7 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   // conf/metadata entries which are received through notifyIndexUpdate.
   private ConcurrentMap<Long, Long> ratisTransactionMap =
       new ConcurrentSkipListMap<>();
+  private OzoneManagerStateMachineMetrics metrics;
 
 
   public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer,
@@ -134,6 +138,7 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
         .setNameFormat(threadPrefix + "InstallSnapshotThread").build();
     this.installSnapshotExecutor =
         HadoopExecutors.newSingleThreadExecutor(installSnapshotThreadFactory);
+    this.metrics = OzoneManagerStateMachineMetrics.create();
   }
 
   /**
@@ -584,7 +589,7 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
       }
     } catch (IOException e) {
       LOG.warn("Failed to write, Exception occurred ", e);
-      return createErrorResponse(request, e);
+      return createErrorResponse(request, e, trxLogIndex);
     } catch (Throwable e) {
       // For any Runtime exceptions, terminate OM.
       String errorMessage = "Request " + request + " failed with exception";
@@ -594,16 +599,20 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   }
 
   private OMResponse createErrorResponse(
-      OMRequest omRequest, IOException exception) {
-    OMResponse.Builder omResponse = OMResponse.newBuilder()
+      OMRequest omRequest, IOException exception, long trxIndex) {
+    OMResponse.Builder omResponseBuilder = OMResponse.newBuilder()
         .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
         .setCmdType(omRequest.getCmdType())
         .setTraceID(omRequest.getTraceID())
         .setSuccess(false);
     if (exception.getMessage() != null) {
-      omResponse.setMessage(exception.getMessage());
+      omResponseBuilder.setMessage(exception.getMessage());
     }
-    return omResponse.build();
+    OMResponse omResponse = omResponseBuilder.build();
+    OMClientResponse omClientResponse = new DummyOMClientResponse(omResponse);
+    omClientResponse.setFlushFuture(
+        ozoneManagerDoubleBuffer.add(omClientResponse, trxIndex));
+    return omResponse;
   }
 
   /**
@@ -683,6 +692,8 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
         }
       }
     }
+    this.metrics.updateApplyTransactionMapSize(applyTransactionMap.size());
+    this.metrics.updateRatisTransactionMapSize(ratisTransactionMap.size());
   }
 
   public void loadSnapshotInfoFromDB() throws IOException {
@@ -723,15 +734,40 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
     this.handler = handler;
   }
 
+  @VisibleForTesting
+  public OzoneManagerRequestHandler getHandler() {
+    return (OzoneManagerRequestHandler) this.handler;
+  }
+
   @VisibleForTesting
   public void setRaftGroupId(RaftGroupId raftGroupId) {
     this.raftGroupId = raftGroupId;
   }
 
+  @VisibleForTesting
+  public OzoneManagerStateMachineMetrics getMetrics() {
+    return this.metrics;
+  }
+
   public void stop() {
     ozoneManagerDoubleBuffer.stop();
     HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
     HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, 
TimeUnit.SECONDS);
+    LOG.info("applyTransactionMap size {} ", applyTransactionMap.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("applyTransactionMap {}",
+          applyTransactionMap.keySet().stream().map(Object::toString)
+              .collect(Collectors.joining(",")));
+    }
+    LOG.info("ratisTransactionMap size {}", ratisTransactionMap.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ratisTransactionMap {}",
+          ratisTransactionMap.keySet().stream().map(Object::toString)
+            .collect(Collectors.joining(",")));
+    }
+    if (metrics != null) {
+      metrics.unRegister();
+    }
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
new file mode 100644
index 0000000000..8ab4fe883a
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Class which maintains metrics related to OzoneManager state machine.
+ */
+@Metrics(about = "OzoneManagerStateMachine Metrics", context = 
OzoneConsts.OZONE)
+public final class OzoneManagerStateMachineMetrics implements MetricsSource {
+
+  private static final String SOURCE_NAME =
+      OzoneManagerStateMachineMetrics.class.getSimpleName();
+  private MetricsRegistry registry;
+  private static OzoneManagerStateMachineMetrics instance;
+
+  @Metric(about = "Number of apply transactions in applyTransactionMap.")
+  private MutableCounterLong applyTransactionMapSize;
+
+  @Metric(about = "Number of ratis transactions in ratisTransactionMap.")
+  private MutableCounterLong ratisTransactionMapSize;
+
+  private OzoneManagerStateMachineMetrics() {
+    registry = new MetricsRegistry(SOURCE_NAME);
+  }
+
+  public static synchronized OzoneManagerStateMachineMetrics create() {
+    if (instance != null) {
+      return instance;
+    } else {
+      MetricsSystem ms = DefaultMetricsSystem.instance();
+      OzoneManagerStateMachineMetrics metrics = new 
OzoneManagerStateMachineMetrics();
+      instance = ms.register(SOURCE_NAME, "OzoneManager StateMachine Metrics",
+          metrics);
+      return instance;
+    }
+  }
+
+  @VisibleForTesting
+  public long getApplyTransactionMapSize() {
+    return applyTransactionMapSize.value();
+  }
+
+  @VisibleForTesting
+  public long getRatisTransactionMapSize() {
+    return ratisTransactionMapSize.value();
+  }
+
+  public void updateApplyTransactionMapSize(long size) {
+    this.applyTransactionMapSize.incr(
+        Math.negateExact(applyTransactionMapSize.value()) + size);
+  }
+
+  public void updateRatisTransactionMapSize(long size) {
+    this.ratisTransactionMapSize.incr(
+        Math.negateExact(ratisTransactionMapSize.value()) + size);
+  }
+
+  public void unRegister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    MetricsRecordBuilder rb = collector.addRecord(SOURCE_NAME);
+
+    applyTransactionMapSize.snapshot(rb, all);
+    ratisTransactionMapSize.snapshot(rb, all);
+    rb.endRecord();
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/DummyOMClientResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/DummyOMClientResponse.java
new file mode 100644
index 0000000000..0f6a11889f
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/DummyOMClientResponse.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * A dummy OMClientResponse implementation.
+ */
+@CleanupTableInfo
+public class DummyOMClientResponse extends OMClientResponse {
+
+  public DummyOMClientResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+    // do nothing
+  }
+}
+
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 483aa2b1ba..f6284393ed 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.lang3.StringUtils;
@@ -36,6 +37,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipReques
 import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.UpgradeFinalizationStatus;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.common.PayloadUtils;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -170,6 +172,7 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
       LoggerFactory.getLogger(OzoneManagerRequestHandler.class);
   private final OzoneManager impl;
   private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
+  private FaultInjector injector;
 
   public OzoneManagerRequestHandler(OzoneManager om,
       OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer) {
@@ -389,6 +392,7 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
   @Override
   public OMClientResponse handleWriteRequest(OMRequest omRequest,
       long transactionLogIndex) throws IOException {
+    injectPause();
     OMClientRequest omClientRequest =
         OzoneManagerRatisUtils.createClientRequest(omRequest, impl);
     return captureLatencyNs(
@@ -411,6 +415,27 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
     this.ozoneManagerDoubleBuffer = omDoubleBuffer;
   }
 
+  @VisibleForTesting
+  public void setInjector(FaultInjector injector) {
+    this.injector = injector;
+  }
+
+  @VisibleForTesting
+  public FaultInjector getInjector() {
+    return injector;
+  }
+
+  /**
+   * Inject pause for test only.
+   *
+   * @throws IOException
+   */
+  private void injectPause() throws IOException {
+    if (injector != null) {
+      injector.pause();
+    }
+  }
+
   private DBUpdatesResponse getOMDBUpdates(
       DBUpdatesRequest dbUpdatesRequest)
       throws IOException {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
index 9958f0a088..10277184bd 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
@@ -148,6 +148,7 @@ public class TestCleanupTableInfo {
     subTypes.remove(OmKeyResponse.class);
     // OMEchoRPCWriteResponse does not need CleanupTable.
     subTypes.remove(OMEchoRPCWriteResponse.class);
+    subTypes.remove(DummyOMClientResponse.class);
     subTypes.forEach(aClass -> {
       Assertions.assertTrue(aClass.isAnnotationPresent(CleanupTableInfo.class),
           aClass + " does not have annotation of" +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to