This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 11936fdc9a71224b9b7d0e5a2d55192cf735bc10 Author: Lei Zhang <[email protected]> AuthorDate: Fri Jun 21 18:25:18 2019 +0800 SCB-1321 Sub-transaction support concurrent --- .../servicecomb/pack/alpha/fsm/SagaActor.java | 39 +++- .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 204 ++++++++++++++++++++- alpha/pom.xml | 3 +- docs/fsm/assets/saga_state_diagram.png | Bin 229442 -> 237463 bytes docs/fsm/assets/state_table.png | Bin 286375 -> 311887 bytes 5 files changed, 234 insertions(+), 12 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index 492d187..2b48f45 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -95,6 +95,16 @@ public class SagaActor extends return goTo(SagaActorState.PARTIALLY_COMMITTED); } } + ).event(TxStartedEvent.class, + (event, data) -> { + updateTxEntity(event, data); + if (data.getExpirationTime() > 0) { + return stay() + .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); + } else { + return stay(); + } + } ).event(SagaTimeoutEvent.class, (event, data) -> { return goTo(SagaActorState.SUSPENDED) @@ -123,6 +133,16 @@ public class SagaActor extends return goTo(SagaActorState.PARTIALLY_ACTIVE); } } + ).event(TxEndedEvent.class, + (event, data) -> { + updateTxEntity(event, data); + if (data.getExpirationTime() > 0) { + return stay() + .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); + } else { + return stay(); + } + } ).event(SagaTimeoutEvent.class, (event, data) -> { return goTo(SagaActorState.SUSPENDED) @@ -196,8 +216,8 @@ public class SagaActor extends ).event(TxEndedEvent.class, SagaData.class, (event, data) -> { updateTxEntity(event, data); - // TODO 调用补偿方法 TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId()); + // TODO call compensate compensation(txEntity, data); return stay(); } @@ -233,8 +253,8 @@ public class SagaActor extends whenUnhandled( matchAnyEvent((event, data) -> { - LOG.error("unmatch event {}", event); - return stay(); + LOG.error("Unhandled event {}", event); + return goTo(SagaActorState.SUSPENDED).replying(data); }) ); @@ -290,26 +310,25 @@ public class SagaActor extends if (txEntity.getState() == TxState.ACTIVE) { txEntity.setEndTime(System.currentTimeMillis()); txEntity.setState(TxState.FAILED); - // TODO 调用补偿方法 data.getTxEntityMap().forEach((k, v) -> { if (v.getState() == TxState.COMMITTED) { - // TODO 调用补偿方法 + // call compensate compensation(v, data); } }); } } else if (event instanceof TxComponsitedEvent) { - //补偿中计数器减一 + // decrement the compensation running counter by one data.getCompensationRunningCounter().decrementAndGet(); txEntity.setState(TxState.COMPENSATED); - LOG.info("完成补偿 {}",txEntity.getLocalTxId()); + LOG.info("compensation is completed {}",txEntity.getLocalTxId()); } } } else if (event instanceof SagaEvent) { if (event instanceof SagaAbortedEvent) { data.getTxEntityMap().forEach((k, v) -> { if (v.getState() == TxState.COMMITTED) { - // TODO 调用补偿方法 + // call compensate compensation(v, data); } }); @@ -324,8 +343,8 @@ public class SagaActor extends } private void compensation(TxEntity txEntity, SagaData data) { - //补偿中计数器加一 + // increments the compensation running counter by one data.getCompensationRunningCounter().incrementAndGet(); - LOG.info("调用补偿方法 {}", txEntity.getLocalTxId()); + LOG.info("compensate {}", txEntity.getLocalTxId()); } } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java index 156d67e..847d144 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -8,6 +8,7 @@ import akka.actor.Terminated; import akka.persistence.fsm.PersistentFSM; import akka.persistence.fsm.PersistentFSM.CurrentState; import akka.testkit.javadsl.TestKit; +import java.lang.invoke.MethodHandles; import java.time.Duration; import java.util.UUID; import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent; @@ -22,9 +23,13 @@ import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SagaActorTest { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + static ActorSystem system; @BeforeClass @@ -320,7 +325,7 @@ public class SagaActorTest { * 10. SagaAbortedEvent-1 */ @Test - public void receivedRemainingEventAfterfirstTxAbortedEventTest() { + public void receivedRemainingEventAfterFirstTxAbortedEventTest() { new TestKit(system) {{ final String globalTxId = UUID.randomUUID().toString(); final String localTxId_1 = UUID.randomUUID().toString(); @@ -609,6 +614,203 @@ public class SagaActorTest { }}; } + // tx concurrent execution + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxStartedEvent-12 + * 4. TxStartedEvent-13 + * 5. TxEndedEvent-11 + * 6. TxEndedEvent-12 + * 7. TxEndedEvent-13 + * 8. SagaEndedEvent-1 + */ + @Test + public void successfulTestWithTxConcurrent() throws InterruptedException { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + sagaData.getTxEntityMap().forEach((k, v) -> { + assertEquals(v.getState(), TxState.COMMITTED); + }); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxStartedEvent-12 + * 5. TxEndedEvent-11 + * 4. TxStartedEvent-13 + * 6. TxEndedEvent-12 + * 7. TxEndedEvent-13 + * 8. SagaEndedEvent-1 + */ + @Test + public void successfulTestWithTxConcurrentCross() throws InterruptedException { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + sagaData.getTxEntityMap().forEach((k, v) -> { + assertEquals(v.getState(), TxState.COMMITTED); + }); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 4. TxStartedEvent-12 + * 6. TxStartedEvent-13 + * 3. TxEndedEvent-11 + * 5. TxEndedEvent-12 + * 7. TxAbortedEvent-13 + * 8. TxComponsitedEvent-11 + * 9. TxComponsitedEvent-12 + * 10. SagaAbortedEvent-1 + */ + @Test + public void lastTxAbortedEventWithTxConcurrentTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + private static void assertSagaTransition(PersistentFSM.Transition transition, ActorRef actorRef, SagaActorState from, SagaActorState to) { assertEquals(transition.fsmRef(), actorRef); diff --git a/alpha/pom.xml b/alpha/pom.xml index 6921f5b..3dfaf13 100644 --- a/alpha/pom.xml +++ b/alpha/pom.xml @@ -32,11 +32,12 @@ <packaging>pom</packaging> <modules> <module>alpha-core</module> + <module>alpha-fsm</module> <module>alpha-spring-boot-compatibility</module> <module>alpha-spring-cloud-starter-eureka</module> <module>alpha-spring-cloud-starter-consul</module> <module>alpha-spring-cloud-starter-zookeeper</module> - <module>alpha-server</module> + <module>alpha-server</module> </modules> <build> diff --git a/docs/fsm/assets/saga_state_diagram.png b/docs/fsm/assets/saga_state_diagram.png index 96cd6c9..349fb72 100644 Binary files a/docs/fsm/assets/saga_state_diagram.png and b/docs/fsm/assets/saga_state_diagram.png differ diff --git a/docs/fsm/assets/state_table.png b/docs/fsm/assets/state_table.png index 5005597..569b2ce 100644 Binary files a/docs/fsm/assets/state_table.png and b/docs/fsm/assets/state_table.png differ
