This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 31265123edcaada8304bd7e662b9b118aad1bb3a Author: cherrylzhao <[email protected]> AuthorDate: Mon Aug 27 15:51:57 2018 +0800 SCB-856 Add exception junit test case for TCC alpha server. --- .../saga/alpha/tcc/server/AlphaTccServerTest.java | 66 ++++++++++++++++++++++ .../server/TccCoordinateCommandStreamObserver.java | 5 ++ 2 files changed, 71 insertions(+) diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java index 503d80a..60b6c57 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java @@ -118,6 +118,17 @@ public class AlphaTccServerTest { ); } + @Test + public void assertOnDisConnect() { + asyncStub.onConnected(serviceConfig, commandStreamObserver); + awaitUntilConnected(); + assertThat( + OmegaCallbacksRegistry.retrieve(serviceName, instanceId), is(instanceOf(GrpcOmegaTccCallback.class)) + ); + blockingStub.onDisconnected(serviceConfig); + assertThat(commandStreamObserver.isCompleted(), is(true)); + } + private void awaitUntilConnected() { await().atMost(2, SECONDS).until(() -> null != (OmegaCallbacksRegistry.getRegistry().get(serviceName))); } @@ -168,6 +179,61 @@ public class AlphaTccServerTest { assertThat(command.getMethod(), is("cancel")); assertThat(command.getGlobalTxId(), is(globalTxId)); assertThat(command.getServiceName(), is(serviceName)); + assertThat(commandStreamObserver.isCompleted(), is(false)); + } + + @Test + public void assertOnCallbackNotExist() { + asyncStub.onConnected(serviceConfig, commandStreamObserver); + awaitUntilConnected(); + + OmegaCallbacksRegistry.getRegistry().remove(serviceName); + blockingStub.onTccTransactionStarted(newTxStart()); + blockingStub.participate(newParticipatedEvent("Succeed")); + GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed")); + assertThat(result.getAborted(), is(true)); + } + + @Test + public void assertOnCallbacksExecuteError() { + asyncStub.onConnected(serviceConfig, commandStreamObserver); + awaitUntilConnected(); + + OmegaCallbacksRegistry.getRegistry().get(serviceName).put(instanceId, new GrpcOmegaTccCallback(null)); + blockingStub.onTccTransactionStarted(newTxStart()); + blockingStub.participate(newParticipatedEvent("Succeed")); + GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed")); + + assertThat(result.getAborted(), is(true)); + assertThat(OmegaCallbacksRegistry.getRegistry().get(serviceName).size(), is(0)); + } + + @Test + public void assertOnSwitchOtherCallbackInstance() { + asyncStub.onConnected(serviceConfig, commandStreamObserver); + GrpcServiceConfig config = GrpcServiceConfig.newBuilder() + .setServiceName(serviceName) + .setInstanceId(uniquify("instanceId")) + .build(); + asyncStub.onConnected(config, commandStreamObserver); + + await().atMost(1, SECONDS).until(() -> (OmegaCallbacksRegistry.getRegistry().get(serviceName) != null)); + await().atMost(1, SECONDS).until(() -> (OmegaCallbacksRegistry.getRegistry().get(serviceName).size() == 2)); + + OmegaCallbacksRegistry.getRegistry().get(serviceName).remove(instanceId); + blockingStub.onTccTransactionStarted(newTxStart()); + blockingStub.participate(newParticipatedEvent("Succeed")); + GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed")); + + await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty()); + assertThat(receivedCommands.size(), is(1)); + GrpcTccCoordinateCommand command = receivedCommands.poll(); + assertThat(command.getMethod(), is("confirm")); + assertThat(command.getGlobalTxId(), is(globalTxId)); + assertThat(command.getServiceName(), is(serviceName)); + + assertThat(result.getAborted(), is(false)); + } private GrpcTccParticipatedEvent newParticipatedEvent(String status) { diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java index f97c5d8..a520d89 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java @@ -26,6 +26,11 @@ public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTc private Queue<GrpcTccCoordinateCommand> receivedCommands; private Consumer<GrpcTccCoordinateCommand> consumer; + + public boolean isCompleted() { + return completed; + } + private boolean completed = false; public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCoordinateCommand> consumer,
