This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit df20c2187c2e1665b92ec82e76bb60062c7a442c Author: Lei Zhang <[email protected]> AuthorDate: Wed Jul 3 21:39:43 2019 +0800 SCB-1321 Add test case receivedRemainingEventAndDelayLastTxEventAfterFirstTxAbortedEventTest --- .../servicecomb/pack/alpha/fsm/SagaActor.java | 10 +++--- .../fsm/event/consumer/SagaEventConsumer.java | 6 ++-- .../alpha/server/fsm/GrpcSagaEventService.java | 4 ++- .../alpha/server/fsm/AlphaIntegrationFsmTest.java | 37 ++++++++++++++++++++-- .../alpha/server/fsm/OmegaEventSagaSimulator.java | 1 - 5 files changed, 47 insertions(+), 11 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index 51d6c52..5104aca 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -256,7 +256,8 @@ public class SagaActor extends } ).event(TxComponsitedCheckInternalEvent.class, SagaData.class, (event, data) -> { - if (hasCompensationSentTx(data)) { + if (hasCompensationSentTx(data) || !data.isTerminated()) { + //if (hasCompensationSentTx(data)) { return stay().replying(data); } else { return goTo(SagaActorState.COMPENSATED) @@ -266,7 +267,7 @@ public class SagaActor extends } ).event(SagaAbortedEvent.class, SagaData.class, (event, data) -> { - //data.setTerminated(true); + data.setTerminated(true); if (hasCommittedTx(data)) { SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED); return stay().replying(data).applying(domainEvent); @@ -409,8 +410,8 @@ public class SagaActor extends txEntity.setEndTime(System.currentTimeMillis()); if (domainEvent.getState() == TxState.COMMITTED) { // stop - data.setEndTime(System.currentTimeMillis()); - data.setTerminated(true); + //data.setEndTime(System.currentTimeMillis()); + //data.setTerminated(true); txEntity.setState(domainEvent.getState()); } else if (domainEvent.getState() == TxState.FAILED) { txEntity.setState(domainEvent.getState()); @@ -430,6 +431,7 @@ public class SagaActor extends } else if (event instanceof SagaEndedDomain) { SagaEndedDomain domainEvent = (SagaEndedDomain) event; if (domainEvent.getState() == SagaActorState.FAILED) { + data.setTerminated(true); data.getTxEntityMap().forEach((k, v) -> { if (v.getState() == TxState.COMMITTED) { // call compensate diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java index 7ddbc1a..fa481f8 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java @@ -58,10 +58,10 @@ public class SagaEventConsumer { saga = optional.get(); } saga.tell(event, ActorRef.noSender()); - LOG.info("tell {} to {}", event.toString(),saga); - //TODO WAL commit + if(LOG.isDebugEnabled()){ + LOG.debug("tell {} to {}", event.toString(),saga); + } }catch (Exception ex){ - //TODO throw ex; } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java index 2394ab2..72a1dfa 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java @@ -78,7 +78,9 @@ public class GrpcSagaEventService extends TxEventServiceImplBase { @Override @Trace("onTransactionEvent") public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) { - LOG.info("onText {}",message); + if(LOG.isDebugEnabled()){ + LOG.debug("onText {}",message); + } boolean ok = true; BaseEvent event = null; if (message.getType().equals(EventType.SagaStartedEvent.name())) { diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java index 27f5fd3..3bbaf6d 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java @@ -31,6 +31,7 @@ import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; import org.apache.servicecomb.pack.alpha.server.AlphaApplication; import org.apache.servicecomb.pack.alpha.server.AlphaConfig; +import org.apache.servicecomb.pack.common.EventType; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -50,7 +51,7 @@ import org.springframework.test.context.junit4.SpringRunner; "alpha.event.pollingInterval=1", "spring.main.allow-bean-definition-overriding=true", "alpha.model.actor.enabled=true", - "spring.profiles.active=akka-persistence-redis" + "spring.profiles.active=akka-persistence-mem" }) public class AlphaIntegrationFsmTest { private static final OmegaEventSender omegaEventSender = OmegaEventSender.builder().build(); @@ -187,7 +188,39 @@ public class AlphaIntegrationFsmTest { omegaEventSender.getOmegaEventSagaSimulator().receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { omegaEventSender.getBlockingStub().onTxEvent(event); }); - await().atMost(2, SECONDS).until(() -> { + await().atMost(5, SECONDS).until(() -> { + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + Assert.assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); + Assert.assertEquals(sagaData.getTxEntityMap().size(),3); + Assert.assertTrue(sagaData.getBeginTime() > 0); + Assert.assertTrue(sagaData.getEndTime() > 0); + Assert.assertTrue(sagaData.getEndTime() > sagaData.getBeginTime()); + Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.FAILED); + Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED); + Assert.assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMPENSATED); + } + + @Test + public void receivedRemainingEventAndDelayLastTxEventAfterFirstTxAbortedEventTest() { + omegaEventSender.onConnected(); + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + omegaEventSender.getOmegaEventSagaSimulator().receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + if(event.getType().equals(EventType.TxStartedEvent.name()) && event.getLocalTxId().equals(localTxId_3)){ + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + omegaEventSender.getBlockingStub().onTxEvent(event); + }); + await().atMost(5, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; }); diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java index 52bcba5..433f54a 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/OmegaEventSagaSimulator.java @@ -262,7 +262,6 @@ public class OmegaEventSagaSimulator { .build(); } - public static Builder builder() { return new Builder(); }
