This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d7f64221c3 HDDS-9876. OM state machine should add response for every
write request to the double-buffer (#5749)
d7f64221c3 is described below
commit d7f64221c335d7316c53594e6e153568f7a1e595
Author: Sammi Chen <[email protected]>
AuthorDate: Wed Dec 27 02:44:29 2023 +0800
HDDS-9876. OM state machine should add response for every write request to
the double-buffer (#5749)
---
.../client/rpc/TestOzoneRpcClientAbstract.java | 7 ++
.../client/rpc/TestOzoneRpcClientWithRatis.java | 118 +++++++++++++++++++++
.../ozone/om/ratis/OzoneManagerStateMachine.java | 48 +++++++--
.../metrics/OzoneManagerStateMachineMetrics.java | 98 +++++++++++++++++
.../ozone/om/response/DummyOMClientResponse.java | 44 ++++++++
.../protocolPB/OzoneManagerRequestHandler.java | 25 +++++
.../ozone/om/response/TestCleanupTableInfo.java | 1 +
7 files changed, 335 insertions(+), 6 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index bd0e7af50f..8f1315c27c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -270,6 +270,13 @@ public abstract class TestOzoneRpcClientAbstract {
TestOzoneRpcClientAbstract.clusterId = clusterId;
}
+ public static OzoneClient getClient() {
+ return TestOzoneRpcClientAbstract.ozClient;
+ }
+
+ public static MiniOzoneCluster getCluster() {
+ return TestOzoneRpcClientAbstract.cluster;
+ }
/**
* Test OM Proxy Provider.
*/
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index a8343c7512..19e111d65d 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -26,13 +26,16 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -49,17 +52,22 @@ import
org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
+import
org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerStateMachineMetrics;
import org.apache.ozone.test.GenericTestUtils;
+import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* This class is to test all the public facing APIs of Ozone Client with an
@@ -284,4 +292,114 @@ public class TestOzoneRpcClientWithRatis extends
TestOzoneRpcClientAbstract {
}
assertArrayEquals(data, buffer);
}
+
+ @Test
+ public void testParallelDeleteBucketAndCreateKey() throws IOException,
+ InterruptedException, TimeoutException {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+
+ String value = "sample value";
+ getStore().createVolume(volumeName);
+ OzoneVolume volume = getStore().getVolume(volumeName);
+ volume.createBucket(bucketName);
+ String keyName = UUID.randomUUID().toString();
+
+ GenericTestUtils.LogCapturer omSMLog = GenericTestUtils.LogCapturer
+ .captureLogs(OzoneManagerStateMachine.LOG);
+ OzoneManagerStateMachine omSM = getCluster().getOzoneManager()
+ .getOmRatisServer().getOmStateMachine();
+ OzoneManagerStateMachineMetrics metrics = omSM.getMetrics();
+
+ Thread thread1 = new Thread(() -> {
+ try {
+ volume.deleteBucket(bucketName);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ Thread thread2 = new Thread(() -> {
+ try {
+ getClient().getProxy().createKey(volumeName, bucketName, keyName,
+ 0, ReplicationType.RATIS, ONE, new HashMap<>());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ OMRequestHandlerPauseInjector injector =
+ new OMRequestHandlerPauseInjector();
+ omSM.getHandler().setInjector(injector);
+ thread1.start();
+ thread2.start();
+ GenericTestUtils.waitFor(() -> metrics.getApplyTransactionMapSize() > 0,
+ 100, 5000);
+ Thread.sleep(2000);
+ injector.resume();
+
+ try {
+ thread1.join();
+ thread2.join();
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ omSM.getHandler().setInjector(null);
+ // Generate more write requests to OM
+ String newBucketName = UUID.randomUUID().toString();
+ volume.createBucket(newBucketName);
+ OzoneBucket bucket = volume.getBucket(newBucketName);
+ for (int i = 0; i < 10; i++) {
+ bucket.createKey("key-" + i, value.getBytes(UTF_8).length,
+ ReplicationType.RATIS, ONE, new HashMap<>());
+ }
+
+ Assert.assertTrue(
+ omSMLog.getOutput().contains("Failed to write, Exception occurred"));
+ GenericTestUtils.waitFor(() -> metrics.getApplyTransactionMapSize() == 0,
+ 100, 5000);
+ }
+
+ private static class OMRequestHandlerPauseInjector extends FaultInjector {
+ private CountDownLatch ready;
+ private CountDownLatch wait;
+
+ OMRequestHandlerPauseInjector() {
+ init();
+ }
+
+ @Override
+ public void init() {
+ this.ready = new CountDownLatch(1);
+ this.wait = new CountDownLatch(1);
+ }
+
+ @Override
+ public void pause() throws IOException {
+ ready.countDown();
+ try {
+ wait.await();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void resume() throws IOException {
+ // Make sure injector pauses before resuming.
+ try {
+ ready.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ assertTrue(fail("resume interrupted"));
+ }
+ wait.countDown();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 2b2ef704b2..5a9faa301e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -40,8 +41,10 @@ import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import
org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerStateMachineMetrics;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.om.response.DummyOMClientResponse;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -85,7 +88,7 @@ import static
org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
*/
public class OzoneManagerStateMachine extends BaseStateMachine {
- static final Logger LOG =
+ public static final Logger LOG =
LoggerFactory.getLogger(OzoneManagerStateMachine.class);
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
@@ -109,6 +112,7 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
// conf/metadata entries which are received through notifyIndexUpdate.
private ConcurrentMap<Long, Long> ratisTransactionMap =
new ConcurrentSkipListMap<>();
+ private OzoneManagerStateMachineMetrics metrics;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer,
@@ -134,6 +138,7 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
.setNameFormat(threadPrefix + "InstallSnapshotThread").build();
this.installSnapshotExecutor =
HadoopExecutors.newSingleThreadExecutor(installSnapshotThreadFactory);
+ this.metrics = OzoneManagerStateMachineMetrics.create();
}
/**
@@ -584,7 +589,7 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
}
} catch (IOException e) {
LOG.warn("Failed to write, Exception occurred ", e);
- return createErrorResponse(request, e);
+ return createErrorResponse(request, e, trxLogIndex);
} catch (Throwable e) {
// For any Runtime exceptions, terminate OM.
String errorMessage = "Request " + request + " failed with exception";
@@ -594,16 +599,20 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
}
private OMResponse createErrorResponse(
- OMRequest omRequest, IOException exception) {
- OMResponse.Builder omResponse = OMResponse.newBuilder()
+ OMRequest omRequest, IOException exception, long trxIndex) {
+ OMResponse.Builder omResponseBuilder = OMResponse.newBuilder()
.setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
.setCmdType(omRequest.getCmdType())
.setTraceID(omRequest.getTraceID())
.setSuccess(false);
if (exception.getMessage() != null) {
- omResponse.setMessage(exception.getMessage());
+ omResponseBuilder.setMessage(exception.getMessage());
}
- return omResponse.build();
+ OMResponse omResponse = omResponseBuilder.build();
+ OMClientResponse omClientResponse = new DummyOMClientResponse(omResponse);
+ omClientResponse.setFlushFuture(
+ ozoneManagerDoubleBuffer.add(omClientResponse, trxIndex));
+ return omResponse;
}
/**
@@ -683,6 +692,8 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
}
}
}
+ this.metrics.updateApplyTransactionMapSize(applyTransactionMap.size());
+ this.metrics.updateRatisTransactionMapSize(ratisTransactionMap.size());
}
public void loadSnapshotInfoFromDB() throws IOException {
@@ -723,15 +734,40 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
this.handler = handler;
}
+ @VisibleForTesting
+ public OzoneManagerRequestHandler getHandler() {
+ return (OzoneManagerRequestHandler) this.handler;
+ }
+
@VisibleForTesting
public void setRaftGroupId(RaftGroupId raftGroupId) {
this.raftGroupId = raftGroupId;
}
+ @VisibleForTesting
+ public OzoneManagerStateMachineMetrics getMetrics() {
+ return this.metrics;
+ }
+
public void stop() {
ozoneManagerDoubleBuffer.stop();
HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5,
TimeUnit.SECONDS);
+ LOG.info("applyTransactionMap size {} ", applyTransactionMap.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("applyTransactionMap {}",
+ applyTransactionMap.keySet().stream().map(Object::toString)
+ .collect(Collectors.joining(",")));
+ }
+ LOG.info("ratisTransactionMap size {}", ratisTransactionMap.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ratisTransactionMap {}",
+ ratisTransactionMap.keySet().stream().map(Object::toString)
+ .collect(Collectors.joining(",")));
+ }
+ if (metrics != null) {
+ metrics.unRegister();
+ }
}
@VisibleForTesting
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
new file mode 100644
index 0000000000..8ab4fe883a
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/metrics/OzoneManagerStateMachineMetrics.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Class which maintains metrics related to OzoneManager state machine.
+ */
+@Metrics(about = "OzoneManagerStateMachine Metrics", context =
OzoneConsts.OZONE)
+public final class OzoneManagerStateMachineMetrics implements MetricsSource {
+
+ private static final String SOURCE_NAME =
+ OzoneManagerStateMachineMetrics.class.getSimpleName();
+ private MetricsRegistry registry;
+ private static OzoneManagerStateMachineMetrics instance;
+
+ @Metric(about = "Number of apply transactions in applyTransactionMap.")
+ private MutableCounterLong applyTransactionMapSize;
+
+ @Metric(about = "Number of ratis transactions in ratisTransactionMap.")
+ private MutableCounterLong ratisTransactionMapSize;
+
+ private OzoneManagerStateMachineMetrics() {
+ registry = new MetricsRegistry(SOURCE_NAME);
+ }
+
+ public static synchronized OzoneManagerStateMachineMetrics create() {
+ if (instance != null) {
+ return instance;
+ } else {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ OzoneManagerStateMachineMetrics metrics = new
OzoneManagerStateMachineMetrics();
+ instance = ms.register(SOURCE_NAME, "OzoneManager StateMachine Metrics",
+ metrics);
+ return instance;
+ }
+ }
+
+ @VisibleForTesting
+ public long getApplyTransactionMapSize() {
+ return applyTransactionMapSize.value();
+ }
+
+ @VisibleForTesting
+ public long getRatisTransactionMapSize() {
+ return ratisTransactionMapSize.value();
+ }
+
+ public void updateApplyTransactionMapSize(long size) {
+ this.applyTransactionMapSize.incr(
+ Math.negateExact(applyTransactionMapSize.value()) + size);
+ }
+
+ public void updateRatisTransactionMapSize(long size) {
+ this.ratisTransactionMapSize.incr(
+ Math.negateExact(ratisTransactionMapSize.value()) + size);
+ }
+
+ public void unRegister() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(SOURCE_NAME);
+ }
+
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ MetricsRecordBuilder rb = collector.addRecord(SOURCE_NAME);
+
+ applyTransactionMapSize.snapshot(rb, all);
+ ratisTransactionMapSize.snapshot(rb, all);
+ rb.endRecord();
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/DummyOMClientResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/DummyOMClientResponse.java
new file mode 100644
index 0000000000..0f6a11889f
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/DummyOMClientResponse.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * A dummy OMClientResponse implementation.
+ */
+@CleanupTableInfo
+public class DummyOMClientResponse extends OMClientResponse {
+
+ public DummyOMClientResponse(@Nonnull OMResponse omResponse) {
+ super(omResponse);
+ }
+
+ @Override
+ public void addToDBBatch(OMMetadataManager omMetadataManager,
+ BatchOperation batchOperation) throws IOException {
+ // do nothing
+ }
+}
+
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 483aa2b1ba..f6284393ed 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import org.apache.commons.lang3.StringUtils;
@@ -36,6 +37,7 @@ import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipReques
import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.UpgradeFinalizationStatus;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.common.PayloadUtils;
import org.apache.hadoop.ozone.om.OzoneManager;
@@ -170,6 +172,7 @@ public class OzoneManagerRequestHandler implements
RequestHandler {
LoggerFactory.getLogger(OzoneManagerRequestHandler.class);
private final OzoneManager impl;
private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
+ private FaultInjector injector;
public OzoneManagerRequestHandler(OzoneManager om,
OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer) {
@@ -389,6 +392,7 @@ public class OzoneManagerRequestHandler implements
RequestHandler {
@Override
public OMClientResponse handleWriteRequest(OMRequest omRequest,
long transactionLogIndex) throws IOException {
+ injectPause();
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, impl);
return captureLatencyNs(
@@ -411,6 +415,27 @@ public class OzoneManagerRequestHandler implements
RequestHandler {
this.ozoneManagerDoubleBuffer = omDoubleBuffer;
}
+ @VisibleForTesting
+ public void setInjector(FaultInjector injector) {
+ this.injector = injector;
+ }
+
+ @VisibleForTesting
+ public FaultInjector getInjector() {
+ return injector;
+ }
+
+ /**
+ * Inject pause for test only.
+ *
+ * @throws IOException
+ */
+ private void injectPause() throws IOException {
+ if (injector != null) {
+ injector.pause();
+ }
+ }
+
private DBUpdatesResponse getOMDBUpdates(
DBUpdatesRequest dbUpdatesRequest)
throws IOException {
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
index 9958f0a088..10277184bd 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
@@ -148,6 +148,7 @@ public class TestCleanupTableInfo {
subTypes.remove(OmKeyResponse.class);
// OMEchoRPCWriteResponse does not need CleanupTable.
subTypes.remove(OMEchoRPCWriteResponse.class);
+ subTypes.remove(DummyOMClientResponse.class);
subTypes.forEach(aClass -> {
Assertions.assertTrue(aClass.isAnnotationPresent(CleanupTableInfo.class),
aClass + " does not have annotation of" +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]