This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 3739b0597b HDDS-12604. Reduce duplication in TestContainerStateMachine
(#8104)
3739b0597b is described below
commit 3739b0597beea2c53729fed1dac167434d3d5df8
Author: Chia-Chuan Yu <[email protected]>
AuthorDate: Tue Mar 25 20:13:03 2025 +0800
HDDS-12604. Reduce duplication in TestContainerStateMachine (#8104)
---
.../server/ratis/TestContainerStateMachine.java | 213 +++++++++++----------
1 file changed, 109 insertions(+), 104 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
index a0c543d723..f021ca3ffd 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -31,7 +32,6 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -79,6 +79,7 @@ abstract class TestContainerStateMachine {
.setNameFormat("ChunkWriter-" + i + "-%d")
.build())).collect(Collectors.toList());
private final boolean isLeader;
+ private static final String CONTAINER_DATA = "Test Data";
TestContainerStateMachine(boolean isLeader) {
this.isLeader = isLeader;
@@ -86,6 +87,8 @@ abstract class TestContainerStateMachine {
@BeforeEach
public void setup() throws IOException {
+ conf.setTimeDuration(HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL,
+ 1000_000_000, TimeUnit.NANOSECONDS);
dispatcher = mock(ContainerDispatcher.class);
ContainerController controller = mock(ContainerController.class);
XceiverServerRatis ratisServer = mock(XceiverServerRatis.class);
@@ -129,52 +132,26 @@ public void testWriteFailure(boolean failWithException)
throws ExecutionExceptio
TransactionContext trx = mock(TransactionContext.class);
ContainerStateMachine.Context context =
mock(ContainerStateMachine.Context.class);
when(trx.getStateMachineContext()).thenReturn(context);
- if (failWithException) {
- when(dispatcher.dispatch(any(), any())).thenThrow(new
RuntimeException());
- } else {
- when(dispatcher.dispatch(any(),
any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
- .newBuilder().setCmdType(ContainerProtos.Type.WriteChunk)
- .setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR)
- .build());
- }
-
when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
- .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
-
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test
Data"))
- .setBlockID(
-
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
- .setContainerID(1)
- .setDatanodeUuid(UUID.randomUUID().toString()).build());
- AtomicReference<Throwable> throwable = new AtomicReference<>(null);
- Function<Throwable, ? extends Message> throwableSetter = t -> {
- throwable.set(t);
- return null;
- };
- stateMachine.write(entry, trx).exceptionally(throwableSetter).get();
+ setUpMockDispatcherReturn(failWithException);
+ setUpMockRequestProtoReturn(context, 1, 1);
+
+ ThrowableCatcher catcher = new ThrowableCatcher();
+
+ stateMachine.write(entry, trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher,
times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
reset(dispatcher);
- assertNotNull(throwable.get());
- if (failWithException) {
- assertInstanceOf(RuntimeException.class, throwable.get());
- } else {
- assertInstanceOf(StorageContainerException.class, throwable.get());
- StorageContainerException sce = (StorageContainerException)
throwable.get();
- assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
sce.getResult());
- }
+ assertNotNull(catcher.getReceived());
+ assertResults(failWithException, catcher.getCaught());
+
// Writing data to another container(containerId 2) should also fail.
-
when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
- .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
-
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test
Data"))
- .setBlockID(
-
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(2).setLocalID(1).build()).build())
- .setContainerID(2)
- .setDatanodeUuid(UUID.randomUUID().toString()).build());
- stateMachine.write(entryNext, trx).exceptionally(throwableSetter).get();
+ setUpMockRequestProtoReturn(context, 2, 1);
+ stateMachine.write(entryNext, trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher,
times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
- assertInstanceOf(StorageContainerException.class, throwable.get());
- StorageContainerException sce = (StorageContainerException)
throwable.get();
+ assertInstanceOf(StorageContainerException.class, catcher.getReceived());
+ StorageContainerException sce = (StorageContainerException)
catcher.getReceived();
assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult());
}
@@ -189,66 +166,39 @@ public void testApplyTransactionFailure(boolean
failWithException) throws Execut
ContainerStateMachine.Context context =
mock(ContainerStateMachine.Context.class);
when(trx.getLogEntry()).thenReturn(entry);
when(trx.getStateMachineContext()).thenReturn(context);
- if (failWithException) {
- when(dispatcher.dispatch(any(), any())).thenThrow(new
RuntimeException());
- } else {
- when(dispatcher.dispatch(any(),
any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
- .newBuilder().setCmdType(ContainerProtos.Type.WriteChunk)
- .setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR)
- .build());
- }
+
+ setUpMockDispatcherReturn(failWithException);
// Failing apply transaction on congtainer 1.
-
when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
- .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
- ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(
-
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
- .setContainerID(1)
- .setDatanodeUuid(UUID.randomUUID().toString()).build());
- AtomicReference<Throwable> throwable = new AtomicReference<>(null);
- Function<Throwable, ? extends Message> throwableSetter = t -> {
- throwable.set(t);
- return null;
- };
+ setUpLogProtoReturn(context, 1, 1);
+ ThrowableCatcher catcher = new ThrowableCatcher();
//apply transaction will fail because of runtime exception thrown by
dispatcher, which marks the first
// failure on container 1.
- stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get();
+ stateMachine.applyTransaction(trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher,
times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
reset(dispatcher);
- assertNotNull(throwable.get());
- if (failWithException) {
- assertInstanceOf(RuntimeException.class, throwable.get());
- } else {
- assertInstanceOf(StorageContainerException.class, throwable.get());
- StorageContainerException sce = (StorageContainerException)
throwable.get();
- assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
sce.getResult());
- }
+ assertNotNull(catcher.getCaught());
+ assertResults(failWithException, catcher.getCaught());
// Another apply transaction on same container 1 should fail because the
previous apply transaction failed.
- stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get();
+ stateMachine.applyTransaction(trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher,
times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
- assertInstanceOf(StorageContainerException.class, throwable.get());
- StorageContainerException sce = (StorageContainerException)
throwable.get();
+ assertInstanceOf(StorageContainerException.class, catcher.getReceived());
+ StorageContainerException sce = (StorageContainerException)
catcher.getReceived();
assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult());
// Another apply transaction on a different container 2 shouldn't fail
because the previous apply transaction
// failure was only on container 1.
-
when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
- .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
- ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(
-
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(2).setLocalID(1).build()).build())
- .setContainerID(2)
- .setDatanodeUuid(UUID.randomUUID().toString()).build());
-
+ setUpLogProtoReturn(context, 2, 1);
reset(dispatcher);
- throwable.set(null);
+ catcher.getCaught().set(null);
when(dispatcher.dispatch(any(),
any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
.newBuilder().setCmdType(ContainerProtos.Type.WriteChunk).setResult(ContainerProtos.Result.SUCCESS)
.build());
- Message succcesfulTransaction =
stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get();
+ Message succcesfulTransaction =
stateMachine.applyTransaction(trx).exceptionally(catcher.asSetter()).get();
verify(dispatcher,
times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
- assertNull(throwable.get());
+ assertNull(catcher.getReceived());
ContainerProtos.ContainerCommandResponseProto resp =
ContainerProtos.ContainerCommandResponseProto.parseFrom(succcesfulTransaction.getContent());
assertEquals(ContainerProtos.Result.SUCCESS, resp.getResult());
@@ -275,32 +225,87 @@ public void testWriteTimout() throws Exception {
return null;
}).when(dispatcher).dispatch(any(), any());
-
when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
- .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
-
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test
Data"))
- .setBlockID(
-
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
- .setContainerID(1)
- .setDatanodeUuid(UUID.randomUUID().toString()).build());
- AtomicReference<Throwable> throwable = new AtomicReference<>(null);
- Function<Throwable, ? extends Message> throwableSetter = t -> {
- throwable.set(t);
- return null;
- };
- Field writeChunkWaitMaxNs =
stateMachine.getClass().getDeclaredField("writeChunkWaitMaxNs");
- writeChunkWaitMaxNs.setAccessible(true);
- writeChunkWaitMaxNs.set(stateMachine, 1000_000_000);
+ setUpMockRequestProtoReturn(context, 1, 1);
+ ThrowableCatcher catcher = new ThrowableCatcher();
+
CompletableFuture<Message> firstWrite = stateMachine.write(entry, trx);
Thread.sleep(2000);
CompletableFuture<Message> secondWrite = stateMachine.write(entryNext,
trx);
- firstWrite.exceptionally(throwableSetter).get();
- assertNotNull(throwable.get());
- assertInstanceOf(InterruptedException.class, throwable.get());
-
- secondWrite.exceptionally(throwableSetter).get();
- assertNotNull(throwable.get());
- assertInstanceOf(StorageContainerException.class, throwable.get());
- StorageContainerException sce = (StorageContainerException)
throwable.get();
+ firstWrite.exceptionally(catcher.asSetter()).get();
+ assertNotNull(catcher.getCaught());
+ assertInstanceOf(InterruptedException.class, catcher.getReceived());
+
+ secondWrite.exceptionally(catcher.asSetter()).get();
+ assertNotNull(catcher.getReceived());
+ assertInstanceOf(StorageContainerException.class, catcher.getReceived());
+ StorageContainerException sce = (StorageContainerException)
catcher.getReceived();
assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
sce.getResult());
}
+
+ private void setUpMockDispatcherReturn(boolean failWithException) {
+ if (failWithException) {
+ when(dispatcher.dispatch(any(), any())).thenThrow(new
RuntimeException());
+ } else {
+ when(dispatcher.dispatch(any(),
any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
+ .newBuilder().setCmdType(ContainerProtos.Type.WriteChunk)
+ .setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR)
+ .build());
+ }
+ }
+
+ private void setUpMockRequestProtoReturn(ContainerStateMachine.Context
context,
+ int containerId, int localId) {
+
when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
+ .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
+
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8(CONTAINER_DATA))
+ .setBlockID(
+
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(containerId)
+ .setLocalID(localId).build()).build())
+ .setContainerID(containerId)
+ .setDatanodeUuid(UUID.randomUUID().toString()).build());
+ }
+
+ private void assertResults(boolean failWithException,
AtomicReference<Throwable> throwable) {
+ if (failWithException) {
+ assertInstanceOf(RuntimeException.class, throwable.get());
+ } else {
+ assertInstanceOf(StorageContainerException.class, throwable.get());
+ StorageContainerException sce = (StorageContainerException)
throwable.get();
+ assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
sce.getResult());
+ }
+ }
+
+ private void setUpLogProtoReturn(ContainerStateMachine.Context context, int
containerId, int localId) {
+
when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
+ .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
+ ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(
+ ContainerProtos.DatanodeBlockID.newBuilder().
+
setContainerID(containerId).setLocalID(localId).build()).build())
+ .setContainerID(containerId)
+ .setDatanodeUuid(UUID.randomUUID().toString()).build());
+ }
+
+ private static class ThrowableCatcher {
+
+ private final AtomicReference<Throwable> caught = new
AtomicReference<>(null);
+
+ public Function<Throwable, ? extends Message> asSetter() {
+ return t -> {
+ caught.set(t);
+ return null;
+ };
+ }
+
+ public AtomicReference<Throwable> getCaught() {
+ return caught;
+ }
+
+ public Throwable getReceived() {
+ return caught.get();
+ }
+
+ public void reset() {
+ caught.set(null);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]