This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 9cee529906beaba4b1c41ab0279093acc8b90b67 Author: Lei Zhang <[email protected]> AuthorDate: Mon Jul 1 22:14:52 2019 +0800 SCB-1321 Support Akka Persistent Redis Recovery --- alpha/alpha-fsm/README.md | 1 + alpha/alpha-fsm/pom.xml | 19 +- .../servicecomb/pack/alpha/fsm/SagaActor.java | 284 ++++++++++++-------- .../apache/servicecomb/pack/alpha/fsm/TxState.java | 3 +- .../AddTxEventDomain.java} | 40 ++- .../fsm/{TxState.java => domain/DomainEvent.java} | 11 +- .../{TxState.java => domain/SagaEndedDomain.java} | 20 +- .../SagaStartedDomain.java} | 28 +- .../UpdateTxEventDomain.java} | 41 ++- ...t.java => TxComponsitedCheckInternalEvent.java} | 31 ++- .../pack/alpha/fsm/event/base/BaseEvent.java | 5 + .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 288 +++++++++++++++++++-- .../pack/alpha/fsm/SagaEventSender.java | 60 ++++- .../pack/alpha/fsm/SagaIntegrationTest.java | 52 +++- .../alpha-fsm/src/test/resources/application.yaml | 20 +- docs/fsm/assets/saga_state_diagram.png | Bin 237463 -> 244645 bytes docs/fsm/plantuml/saga-state-diagram.puml | 2 +- 17 files changed, 705 insertions(+), 200 deletions(-) diff --git a/alpha/alpha-fsm/README.md b/alpha/alpha-fsm/README.md index 3a37fed..e7ea7b9 100644 --- a/alpha/alpha-fsm/README.md +++ b/alpha/alpha-fsm/README.md @@ -6,6 +6,7 @@ ## Test State Machine ``` +git clone -b SCB-1321 [email protected]:apache/servicecomb-pack.git cd alpha mvn clean package -pl alpha-fsm ``` \ No newline at end of file diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml index b1897d9..8788430 100644 --- a/alpha/alpha-fsm/pom.xml +++ b/alpha/alpha-fsm/pom.xml @@ -31,6 +31,7 @@ <properties> <leveldbjni-all.version>1.8</leveldbjni-all.version> + <akka-persistence-redis.version>0.4.0</akka-persistence-redis.version> </properties> <dependencyManagement> @@ -72,12 +73,7 @@ <dependency> <groupId>javax.persistence</groupId> <artifactId>javax.persistence-api</artifactId> - </dependency> -<!-- <dependency>--> -<!-- <groupId>org.apache.logging.log4j</groupId>--> -<!-- <artifactId>log4j-slf4j-impl</artifactId>--> -<!-- <scope>test</scope>--> -<!-- </dependency>--> + </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> @@ -103,6 +99,11 @@ <artifactId>leveldbjni-all</artifactId> <version>${leveldbjni-all.version}</version> </dependency> + <dependency> + <groupId>com.safety-data</groupId> + <artifactId>akka-persistence-redis_2.12</artifactId> + <version>${akka-persistence-redis.version}</version> + </dependency> <!-- For testing the artifacts scope are test--> <dependency> @@ -138,7 +139,11 @@ <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-testkit_2.12</artifactId> - </dependency> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.12</artifactId> + </dependency> </dependencies> </project> diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index dca006e..d22cb79 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -22,19 +22,20 @@ import akka.persistence.fsm.AbstractPersistentFSM; import java.lang.invoke.MethodHandles; import java.util.Arrays; import java.util.concurrent.TimeUnit; +import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain; +import org.apache.servicecomb.pack.alpha.fsm.domain.DomainEvent; +import org.apache.servicecomb.pack.alpha.fsm.domain.SagaEndedDomain; +import org.apache.servicecomb.pack.alpha.fsm.domain.SagaStartedDomain; +import org.apache.servicecomb.pack.alpha.fsm.domain.UpdateTxEventDomain; import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent; -import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent; -import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent.DomainEvent; import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedCheckInternalEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent; import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent; -import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; -import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; -import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension; @@ -62,14 +63,15 @@ public class SagaActor extends when(SagaActorState.IDEL, matchEvent(SagaStartedEvent.class, (event, data) -> { - data.setGlobalTxId(event.getGlobalTxId()); - data.setBeginTime(System.currentTimeMillis()); + SagaStartedDomain domainEvent = new SagaStartedDomain(event.getGlobalTxId(), + event.getCreateTime(), event.getTimeout()); if (event.getTimeout() > 0) { - data.setExpirationTime(data.getBeginTime() + event.getTimeout() * 1000); return goTo(SagaActorState.READY) - .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); + .applying(domainEvent) + .forMax(Duration.create(event.getTimeout(), TimeUnit.MILLISECONDS)); } else { - return goTo(SagaActorState.READY); + return goTo(SagaActorState.READY) + .applying(domainEvent); } } @@ -79,25 +81,36 @@ public class SagaActor extends when(SagaActorState.READY, matchEvent(TxStartedEvent.class, SagaData.class, (event, data) -> { - updateTxEntity(event, data); + AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(), + event.getLocalTxId()); if (data.getExpirationTime() > 0) { return goTo(SagaActorState.PARTIALLY_ACTIVE) + .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); } else { - return goTo(SagaActorState.PARTIALLY_ACTIVE); + return goTo(SagaActorState.PARTIALLY_ACTIVE) + .applying(domainEvent); } } ).event(SagaEndedEvent.class, (event, data) -> { - return goTo(SagaActorState.SUSPENDED).replying(data); + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); + return goTo(SagaActorState.SUSPENDED) + .applying(domainEvent) + .replying(data); } ).event(SagaAbortedEvent.class, (event, data) -> { - return goTo(SagaActorState.SUSPENDED).replying(data); + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); + return goTo(SagaActorState.SUSPENDED) + .applying(domainEvent) + .replying(data); } ).event(Arrays.asList(StateTimeout()), SagaData.class, (event, data) -> { + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) + .applying(domainEvent) .replying(data); }) ); @@ -105,34 +118,43 @@ public class SagaActor extends when(SagaActorState.PARTIALLY_ACTIVE, matchEvent(TxEndedEvent.class, SagaData.class, (event, data) -> { - updateTxEntity(event, data); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), + event.getLocalTxId(), TxState.COMMITTED); if (data.getExpirationTime() > 0) { return goTo(SagaActorState.PARTIALLY_COMMITTED) + .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); } else { - return goTo(SagaActorState.PARTIALLY_COMMITTED); + return goTo(SagaActorState.PARTIALLY_COMMITTED) + .applying(domainEvent); } } ).event(TxStartedEvent.class, (event, data) -> { - updateTxEntity(event, data); + AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(), + event.getLocalTxId()); if (data.getExpirationTime() > 0) { return stay() + .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); } else { - return stay(); + return stay().applying(domainEvent); } } ).event(SagaTimeoutEvent.class, (event, data) -> { + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) + .applying(domainEvent) .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(TxAbortedEvent.class, (event, data) -> { - updateTxEntity(event, data); - return goTo(SagaActorState.FAILED); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), + event.getLocalTxId(), TxState.FAILED); + return goTo(SagaActorState.FAILED) + .applying(domainEvent); } ).event(Arrays.asList(StateTimeout()), SagaData.class, (event, data) -> { @@ -143,47 +165,55 @@ public class SagaActor extends when(SagaActorState.PARTIALLY_COMMITTED, matchEvent(TxStartedEvent.class, (event, data) -> { - updateTxEntity(event, data); + AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(), + event.getLocalTxId()); if (data.getExpirationTime() > 0) { return goTo(SagaActorState.PARTIALLY_ACTIVE) + .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); } else { - return goTo(SagaActorState.PARTIALLY_ACTIVE); + return goTo(SagaActorState.PARTIALLY_ACTIVE) + .applying(domainEvent); } } ).event(TxEndedEvent.class, (event, data) -> { - updateTxEntity(event, data); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), + event.getLocalTxId(), TxState.COMMITTED); if (data.getExpirationTime() > 0) { return stay() + .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); } else { - return stay(); + return stay().applying(domainEvent); } } ).event(SagaTimeoutEvent.class, (event, data) -> { + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) + .applying(domainEvent) .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(SagaEndedEvent.class, (event, data) -> { - data.setEndTime(System.currentTimeMillis()); + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.COMMITTED); return goTo(SagaActorState.COMMITTED) + .applying(domainEvent) .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(SagaAbortedEvent.class, (event, data) -> { - data.setEndTime(System.currentTimeMillis()); - updateTxEntity(event, data); - return goTo(SagaActorState.FAILED); + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED); + return goTo(SagaActorState.FAILED).applying(domainEvent); } ).event(TxAbortedEvent.class, (event, data) -> { - updateTxEntity(event, data); - return goTo(SagaActorState.FAILED); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), + event.getLocalTxId(), TxState.FAILED); + return goTo(SagaActorState.FAILED).applying(domainEvent); } ).event(Arrays.asList(StateTimeout()), SagaData.class, (event, data) -> { @@ -194,18 +224,25 @@ public class SagaActor extends when(SagaActorState.FAILED, matchEvent(SagaTimeoutEvent.class, SagaData.class, (event, data) -> { - data.setEndTime(System.currentTimeMillis()); + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) + .applying(domainEvent) .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(TxComponsitedEvent.class, SagaData.class, (event, data) -> { - data.setEndTime(System.currentTimeMillis()); - updateTxEntity(event, data); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), + event.getLocalTxId(), TxState.COMPENSATED); + return stay().applying(domainEvent).andThen(exec(_data -> { + self().tell(TxComponsitedCheckInternalEvent.builder().build(), self()); + })); + } + ).event(TxComponsitedCheckInternalEvent.class, SagaData.class, + (event, data) -> { if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0) || hasCommittedTx(data)) { - return stay(); + return stay().replying(data); } else { return goTo(SagaActorState.COMPENSATED) .replying(data) @@ -214,30 +251,36 @@ public class SagaActor extends } ).event(SagaAbortedEvent.class, SagaData.class, (event, data) -> { - data.setEndTime(System.currentTimeMillis()); - updateTxEntity(event, data); data.setTerminated(true); - if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0) - || hasCommittedTx(data)) { - return stay(); + if (hasCommittedTx(data)) { + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED); + return stay().replying(data).applying(domainEvent); + } else if(hasCompensationSentTx(data)){ + return stay().replying(data); } else { + SagaEndedDomain domainEvent = new SagaEndedDomain( + SagaActorState.COMPENSATED); return goTo(SagaActorState.COMPENSATED) + .applying(domainEvent) .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } } ).event(TxStartedEvent.class, SagaData.class, (event, data) -> { - updateTxEntity(event, data); - return stay(); + AddTxEventDomain domainEvent = new AddTxEventDomain(event.getParentTxId(), + event.getLocalTxId()); + return stay().applying(domainEvent); } ).event(TxEndedEvent.class, SagaData.class, (event, data) -> { - updateTxEntity(event, data); - TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId()); - // TODO call compensate - compensation(txEntity, data); - return stay(); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), + event.getLocalTxId(), TxState.COMMITTED); + return stay().applying(domainEvent).andThen(exec(_data -> { + TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId()); + // call compensate + compensation(txEntity, _data); + })); } ).event(Arrays.asList(StateTimeout()), SagaData.class, (event, data) -> { @@ -248,6 +291,17 @@ public class SagaActor extends when(SagaActorState.COMMITTED, matchAnyEvent( (event, data) -> { + data.setEndTime(System.currentTimeMillis()); + /** + * deleteMessages 只会删除redis中actor的数据,但是不会删除actor的highestSequenceNr https://github.com/akka/akka/issues/21181 + * 已停止的 actor highestSequenceNr 需要手动清理,例如 actor 的持久化ID为 3c500008-7b9f-415f-b2fd-e6ad0d455fc1 + * 在Redis中当key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr没有匹配的 + * key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1时,表示这个actor已经停止,可以使用以下命令清理 + * del journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr + * srem journal:persistenceIds 3c500008-7b9f-415f-b2fd-e6ad0d455fc1 + * */ + deleteMessages(lastSequenceNr()); + deleteSnapshot(snapshotSequenceNr()); return stop(); } ) @@ -256,6 +310,9 @@ public class SagaActor extends when(SagaActorState.SUSPENDED, matchAnyEvent( (event, data) -> { + data.setEndTime(System.currentTimeMillis()); + deleteMessages(lastSequenceNr()); + deleteSnapshot(snapshotSequenceNr()); return stop(); } ) @@ -264,6 +321,9 @@ public class SagaActor extends when(SagaActorState.COMPENSATED, matchAnyEvent( (event, data) -> { + data.setEndTime(System.currentTimeMillis()); + deleteMessages(lastSequenceNr()); + deleteSnapshot(snapshotSequenceNr()); return stop(); } ) @@ -278,6 +338,11 @@ public class SagaActor extends onTransition( matchState(null, null, (from, to) -> { + if (stateData().getGlobalTxId() != null) { + stateData().setLastState(to); + LogExtension.LogExtensionProvider.get(getContext().getSystem()) + .putSagaData(stateData().getGlobalTxId(), stateData()); + } LOG.info("transition {} {} -> {}", getSelf(), from, to); }) ); @@ -288,7 +353,8 @@ public class SagaActor extends LOG.info("stop {} {}", data.getGlobalTxId(), state); data.setTerminated(true); data.setLastState(state); - LogExtension.LogExtensionProvider.get(getContext().getSystem()).putSagaData(data.getGlobalTxId(),data); + LogExtension.LogExtensionProvider.get(getContext().getSystem()) + .putSagaData(data.getGlobalTxId(), data); } ) ); @@ -296,73 +362,77 @@ public class SagaActor extends } @Override - public void onRecoveryCompleted() { - LOG.info("onRecoveryCompleted: {} {}", stateName(), stateData()); - } - - @Override - public Class domainEventClass() { - return SagaDomainEvent.DomainEvent.class; - } - - - @Override - public String persistenceId() { - return persistenceId; - } - - @Override - public SagaData applyEvent(DomainEvent domainEvent, SagaData currentData) { - return currentData; - } - - private void updateTxEntity(BaseEvent event, SagaData data) { - if (event instanceof TxEvent) { - TxEvent txEvent = (TxEvent) event; - if (!data.getTxEntityMap().containsKey(txEvent.getLocalTxId())) { - if (event instanceof TxStartedEvent) { - TxEntity txEntity = TxEntity.builder() - .localTxId(txEvent.getLocalTxId()) - .parentTxId(txEvent.getParentTxId()) - .state(TxState.ACTIVE) - .build(); - data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity); - } + public SagaData applyEvent(DomainEvent event, SagaData data) { + if (event instanceof SagaStartedDomain) { + SagaStartedDomain domainEvent = (SagaStartedDomain) event; + data.setGlobalTxId(domainEvent.getGlobalTxId()); + data.setBeginTime(domainEvent.getCreateTime()); + data.setExpirationTime(domainEvent.getExpirationTime()); + } else if (event instanceof AddTxEventDomain) { + AddTxEventDomain domainEvent = (AddTxEventDomain) event; + if (!data.getTxEntityMap().containsKey(domainEvent.getLocalTxId())) { + TxEntity txEntity = TxEntity.builder() + .localTxId(domainEvent.getLocalTxId()) + .parentTxId(domainEvent.getParentTxId()) + .state(domainEvent.getState()) + .build(); + data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity); } else { - TxEntity txEntity = data.getTxEntityMap().get(txEvent.getLocalTxId()); - if (event instanceof TxEndedEvent) { - if (txEntity.getState() == TxState.ACTIVE) { - txEntity.setEndTime(System.currentTimeMillis()); - txEntity.setState(TxState.COMMITTED); - } - } else if (event instanceof TxAbortedEvent) { - if (txEntity.getState() == TxState.ACTIVE) { - txEntity.setEndTime(System.currentTimeMillis()); - txEntity.setState(TxState.FAILED); - data.getTxEntityMap().forEach((k, v) -> { - if (v.getState() == TxState.COMMITTED) { - // call compensate - compensation(v, data); - } - }); + LOG.warn("TxEntity {} already exists", domainEvent.getLocalTxId()); + } + } else if (event instanceof UpdateTxEventDomain) { + UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event; + TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId()); + txEntity.setEndTime(System.currentTimeMillis()); + if (domainEvent.getState() == TxState.COMMITTED) { + txEntity.setState(domainEvent.getState()); + } else if (domainEvent.getState() == TxState.FAILED) { + txEntity.setState(domainEvent.getState()); + data.getTxEntityMap().forEach((k, v) -> { + if (v.getState() == TxState.COMMITTED) { + // call compensate + compensation(v, data); } - } else if (event instanceof TxComponsitedEvent) { - // decrement the compensation running counter by one - data.getCompensationRunningCounter().decrementAndGet(); - txEntity.setState(TxState.COMPENSATED); - LOG.info("compensation is completed {}", txEntity.getLocalTxId()); - } + }); + } else if (domainEvent.getState() == TxState.COMPENSATED) { + // decrement the compensation running counter by one + data.getCompensationRunningCounter().decrementAndGet(); + txEntity.setState(domainEvent.getState()); + LOG.info("compensation is completed {}", txEntity.getLocalTxId()); } - } else if (event instanceof SagaEvent) { - if (event instanceof SagaAbortedEvent) { + } else if (event instanceof SagaEndedDomain) { + SagaEndedDomain domainEvent = (SagaEndedDomain) event; + if (domainEvent.getState() == SagaActorState.FAILED) { data.getTxEntityMap().forEach((k, v) -> { if (v.getState() == TxState.COMMITTED) { // call compensate compensation(v, data); } }); + } else if (domainEvent.getState() == SagaActorState.SUSPENDED) { + + } else if (domainEvent.getState() == SagaActorState.COMPENSATED) { + } } + LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId()); + return data; + } + + @Override + public void onRecoveryCompleted() { + LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId()); + } + + @Override + public Class domainEventClass() { + return DomainEvent.class; + } + + + @Override + public String persistenceId() { + return persistenceId; } private boolean hasCommittedTx(SagaData data) { @@ -371,9 +441,17 @@ public class SagaActor extends .count() > 0; } + private boolean hasCompensationSentTx(SagaData data) { + return data.getTxEntityMap().entrySet().stream() + .filter(map -> map.getValue().getState() == TxState.COMPENSATION_SENT) + .count() > 0; + } + private void compensation(TxEntity txEntity, SagaData data) { // increments the compensation running counter by one data.getCompensationRunningCounter().incrementAndGet(); + //TODO call omega compensate method LOG.info("compensate {}", txEntity.getLocalTxId()); + txEntity.setState(TxState.COMPENSATION_SENT); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java index 0494895..af02ce8 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java @@ -21,5 +21,6 @@ public enum TxState { ACTIVE, FAILED, COMMITTED, - COMPENSATED; + COMPENSATION_SENT, // The compensation method has been called to wait for TxComponsitedEvent + COMPENSATED } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java similarity index 51% copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java index 5794566..c7c65a3 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java @@ -15,29 +15,41 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm.event.base; +package org.apache.servicecomb.pack.alpha.fsm.domain; -import java.io.Serializable; +import org.apache.servicecomb.pack.alpha.fsm.TxState; -public abstract class BaseEvent implements Serializable { - private String globalTxId; +public class AddTxEventDomain implements DomainEvent { + private String parentTxId; + private String localTxId; + private TxState state = TxState.ACTIVE; - public BaseEvent() { + public AddTxEventDomain(String parentTxId, String localTxId) { + this.parentTxId = parentTxId; + this.localTxId = localTxId; + } + + public String getParentTxId() { + return parentTxId; + } + + public void setParentTxId(String parentTxId) { + this.parentTxId = parentTxId; + } + public String getLocalTxId() { + return localTxId; } - public String getGlobalTxId() { - return globalTxId; + public void setLocalTxId(String localTxId) { + this.localTxId = localTxId; } - public void setGlobalTxId(String globalTxId) { - this.globalTxId = globalTxId; + public TxState getState() { + return state; } - @Override - public String toString() { - return "BaseEvent{" + - "globalTxId='" + globalTxId + '\'' + - '}'; + public void setState(TxState state) { + this.state = state; } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java similarity index 85% copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java index 0494895..10c293c 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm; +package org.apache.servicecomb.pack.alpha.fsm.domain; + +import java.io.Serializable; + +public interface DomainEvent extends Serializable { -public enum TxState { - ACTIVE, - FAILED, - COMMITTED, - COMPENSATED; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java similarity index 70% copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java index 0494895..d3e40b1 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java @@ -15,11 +15,19 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm; +package org.apache.servicecomb.pack.alpha.fsm.domain; -public enum TxState { - ACTIVE, - FAILED, - COMMITTED, - COMPENSATED; +import org.apache.servicecomb.pack.alpha.fsm.SagaActorState; + +public class SagaEndedDomain implements DomainEvent { + + private SagaActorState state; + + public SagaEndedDomain(SagaActorState state) { + this.state = state; + } + + public SagaActorState getState() { + return state; + } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java similarity index 65% copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java index 5794566..fe75f04 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java @@ -15,29 +15,31 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm.event.base; +package org.apache.servicecomb.pack.alpha.fsm.domain; -import java.io.Serializable; +public class SagaStartedDomain implements DomainEvent { -public abstract class BaseEvent implements Serializable { + private long createTime; private String globalTxId; + private long expirationTime; - public BaseEvent() { + public SagaStartedDomain(String globalTxId, long createTime, int timeout) { + this.createTime = createTime; + this.globalTxId = globalTxId; + if (timeout > 0) { + this.expirationTime = System.currentTimeMillis() + timeout * 1000; + } + } + public long getCreateTime() { + return createTime; } public String getGlobalTxId() { return globalTxId; } - public void setGlobalTxId(String globalTxId) { - this.globalTxId = globalTxId; - } - - @Override - public String toString() { - return "BaseEvent{" + - "globalTxId='" + globalTxId + '\'' + - '}'; + public long getExpirationTime() { + return expirationTime; } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java similarity index 50% copy from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java copy to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java index 5794566..839cfe0 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java @@ -15,29 +15,42 @@ * limitations under the License. */ -package org.apache.servicecomb.pack.alpha.fsm.event.base; +package org.apache.servicecomb.pack.alpha.fsm.domain; -import java.io.Serializable; +import org.apache.servicecomb.pack.alpha.fsm.TxState; -public abstract class BaseEvent implements Serializable { - private String globalTxId; +public class UpdateTxEventDomain implements DomainEvent { + private String parentTxId; + private String localTxId; + private TxState state; - public BaseEvent() { + public UpdateTxEventDomain(String parentTxId, String localTxId, TxState state) { + this.parentTxId = parentTxId; + this.localTxId = localTxId; + this.state = state; + } + + public String getParentTxId() { + return parentTxId; + } + + public void setParentTxId(String parentTxId) { + this.parentTxId = parentTxId; + } + public String getLocalTxId() { + return localTxId; } - public String getGlobalTxId() { - return globalTxId; + public void setLocalTxId(String localTxId) { + this.localTxId = localTxId; } - public void setGlobalTxId(String globalTxId) { - this.globalTxId = globalTxId; + public TxState getState() { + return state; } - @Override - public String toString() { - return "BaseEvent{" + - "globalTxId='" + globalTxId + '\'' + - '}'; + public void setState(TxState state) { + this.state = state; } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java similarity index 60% rename from alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java rename to alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java index b16e44a..225ef26 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedCheckInternalEvent.java @@ -17,15 +17,24 @@ package org.apache.servicecomb.pack.alpha.fsm.event; -public class SagaDomainEvent { - public interface DomainEvent {} - - public enum SagaStartedEvent implements DomainEvent {INSTANCE} - public enum SagaEndedEvent implements DomainEvent {INSTANCE} - public enum SagaAbortedEvent implements DomainEvent {INSTANCE} - public enum SagaTimeoutEvent implements DomainEvent {INSTANCE} - public enum TxStartedEvent implements DomainEvent {INSTANCE} - public enum TxEndedEvent implements DomainEvent {INSTANCE} - public enum TxAbortedEvent implements DomainEvent {INSTANCE} - public enum TxComponsitedEvent implements DomainEvent {INSTANCE} +import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; + +public class TxComponsitedCheckInternalEvent extends TxEvent { + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private TxComponsitedCheckInternalEvent txComponsitedEvent; + + private Builder() { + txComponsitedEvent = new TxComponsitedCheckInternalEvent(); + } + + public TxComponsitedCheckInternalEvent build() { + return txComponsitedEvent; + } + } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java index 5794566..4ba8372 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java @@ -21,11 +21,16 @@ import java.io.Serializable; public abstract class BaseEvent implements Serializable { private String globalTxId; + private long createTime = System.currentTimeMillis(); public BaseEvent() { } + public long getCreateTime() { + return createTime; + } + public String getGlobalTxId() { return globalTxId; } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java index 5644c1b..313c526 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -25,9 +25,13 @@ import akka.actor.Terminated; import akka.persistence.fsm.PersistentFSM; import akka.persistence.fsm.PersistentFSM.CurrentState; import akka.testkit.javadsl.TestKit; +import com.typesafe.config.ConfigFactory; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; +import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.LogExtension; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -36,9 +40,30 @@ public class SagaActorTest { static ActorSystem system; + private static Map<String,Object> getPersistenceMemConfig(){ + Map<String, Object> map = new HashMap<>(); + map.put("akka.persistence.journal.plugin", "akka.persistence.journal.inmem"); + map.put("akka.persistence.journal.leveldb.dir", "target/example/journal"); + map.put("akka.persistence.snapshot-store.plugin", "akka.persistence.snapshot-store.local"); + map.put("akka.persistence.snapshot-store.local.dir", "target/example/snapshots"); + return map; + } + + private static Map<String,Object> getPersistenceRedisConfig(){ + Map<String, Object> map = new HashMap<>(); + map.put("akka.actor.warn-about-java-serializer-usage",false); + map.put("akka.persistence.journal.plugin", "akka-persistence-redis.journal"); + map.put("akka.persistence.snapshot-store.plugin", "akka-persistence-redis.snapshot"); + map.put("akka-persistence-redis.redis.mode", "simple"); + map.put("akka-persistence-redis.redis.host", "localhost"); + map.put("akka-persistence-redis.redis.port", "6379"); + map.put("akka-persistence-redis.redis.database", "0"); + return map; + } + @BeforeClass public static void setup() { - system = ActorSystem.create("SagaActorTest"); + system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig())); } @AfterClass @@ -69,7 +94,7 @@ public class SagaActorTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); @@ -122,6 +147,158 @@ public class SagaActorTest { /** * 1. SagaStartedEvent-1 * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxEndedEvent-12 + * 4. TxStartedEvent-13 + * 5. TxEndedEvent-13 + * 6. SagaEndedEvent-1 + */ + @Test + public void successfulRecoveryWithCorrectStateDataTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + String persistenceId = genPersistenceId(); + ActorRef saga = system.actorOf(SagaActor.props(persistenceId)); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + saga.tell(event, getRef()); + }); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + //expectTerminated(saga); + + ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), "recoveredSaga"); + watch(recoveredSaga); + recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + recoveredSaga.tell(event, getRef()); + }); + + currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.PARTIALLY_ACTIVE, currentState.state()); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + sagaData.getTxEntityMap().forEach((k, v) -> { + assertEquals(v.getState(), TxState.COMMITTED); + }); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), recoveredSaga); + system.stop(saga); + }}; + } + + @Test + public void successfulRecoveryWithCorrectStateDataTestAndDiffentSystem() { + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + String persistenceId = genPersistenceId(); + ActorSystem system1 = ActorSystem.create("SagaActorTest1", ConfigFactory.parseMap(getPersistenceRedisConfig())); + new TestKit(system1) {{ + ActorRef saga = system1.actorOf(SagaActor.props(persistenceId)); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + saga.tell(event, getRef()); + }); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + system1.stop(saga); + }}; + TestKit.shutdownActorSystem(system1); + + ActorSystem system2 = ActorSystem.create("SagaActorTest2", ConfigFactory.parseMap(getPersistenceRedisConfig())); + new TestKit(system2) {{ + ActorRef recoveredSaga = system2.actorOf(SagaActor.props(persistenceId), "recoveredSaga"); + watch(recoveredSaga); + recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + recoveredSaga.tell(event, getRef()); + }); + + CurrentState currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.PARTIALLY_ACTIVE, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + sagaData.getTxEntityMap().forEach((k, v) -> { + assertEquals(v.getState(), TxState.COMMITTED); + }); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), recoveredSaga); + system2.stop(recoveredSaga); + }}; + TestKit.shutdownActorSystem(system2); + } + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 * 3. TxAbortedEvent-11 * 7. SagaAbortedEvent-1 */ @@ -131,7 +308,7 @@ public class SagaActorTest { final String globalTxId = UUID.randomUUID().toString(); final String localTxId_1 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); @@ -185,7 +362,7 @@ public class SagaActorTest { final String localTxId_1 = UUID.randomUUID().toString(); final String localTxId_2 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); @@ -249,7 +426,7 @@ public class SagaActorTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); @@ -303,6 +480,79 @@ public class SagaActorTest { /** * 1. SagaStartedEvent-1 * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxEndedEvent-12 + * 6. TxStartedEvent-13 + * 7. TxAbortedEvent-13 + * 8. TxComponsitedEvent-11 + * 9. TxComponsitedEvent-12 + * 10. SagaAbortedEvent-1 + */ + @Test + public void sagaAbortedEventBeforeTxComponsitedEventTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + + SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + saga.tell(event, getRef()); + }); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); + + SagaData sagaData = expectMsgClass(SagaData.class); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + + system.stop(saga); + }}; + } + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 * 3. TxAbortedEvent-11 * 4. TxStartedEvent-12 * 5. TxEndedEvent-12 @@ -320,7 +570,7 @@ public class SagaActorTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); @@ -381,7 +631,7 @@ public class SagaActorTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); @@ -417,7 +667,13 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED); - SagaData sagaData = expectMsgClass(SagaData.class); + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); @@ -425,12 +681,6 @@ public class SagaActorTest { assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED); assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); - transition = expectMsgClass(PersistentFSM.Transition.class); - assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); - - Terminated terminated = expectMsgClass(Terminated.class); - assertEquals(terminated.getActor(), saga); - system.stop(saga); }}; } @@ -454,7 +704,7 @@ public class SagaActorTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); @@ -522,7 +772,7 @@ public class SagaActorTest { final String localTxId_3 = UUID.randomUUID().toString(); final int timeout = 5; - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); @@ -584,7 +834,7 @@ public class SagaActorTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { @@ -639,7 +889,7 @@ public class SagaActorTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { @@ -702,7 +952,7 @@ public class SagaActorTest { final String localTxId_2 = UUID.randomUUID().toString(); final String localTxId_3 = UUID.randomUUID().toString(); - ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java index a3ddd0d..d303f86 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java @@ -124,6 +124,33 @@ public class SagaEventSender { /** * 1. SagaStartedEvent-1 * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxEndedEvent-12 + * 6. TxStartedEvent-13 + * 7. TxAbortedEvent-13 + * 8. SagaAbortedEvent-1 + * 9. TxComponsitedEvent-11 + * 10. TxComponsitedEvent-12 + */ + public static List<BaseEvent> sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ + List<BaseEvent> sagaEvents = new ArrayList<>(); + sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + return sagaEvents; + } + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 * 3. TxAbortedEvent-11 * 4. TxStartedEvent-12 * 5. TxEndedEvent-12 @@ -292,6 +319,37 @@ public class SagaEventSender { sagaEvents.add(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); return sagaEvents; - } + } + + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + */ + public static List<BaseEvent> successfulFirstHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ + List<BaseEvent> sagaEvents = new ArrayList<>(); + sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + return sagaEvents; + } + + /** + * 1. TxEndedEvent-12 + * 2. TxStartedEvent-13 + * 3. TxEndedEvent-13 + * 4. SagaEndedEvent-1 + */ + public static List<BaseEvent> successfulSecondHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ + List<BaseEvent> sagaEvents = new ArrayList<>(); + sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build()); + return sagaEvents; + } } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java index 5a213f1..85c8b18 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java @@ -36,9 +36,7 @@ import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest(classes = {SagaApplication.class}, properties = { "alpha.model.actor.enabled=true", - "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem", - "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local", - "akkaConfig.akka.persistence.snapshot-store.local.dir=target/example/snapshots" + "spring.profiles.active=akka-persistence-redis" }) public class SagaIntegrationTest { @@ -63,6 +61,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMMITTED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 3 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED @@ -85,6 +85,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 1 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED; }else{ @@ -106,6 +108,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 2 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.FAILED; @@ -129,6 +133,34 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 + && sagaData.getTxEntityMap().size() == 3 + && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED + && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED + && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED; + }else{ + return false; + } + }); + } + + @Test + public void sagaAbortedEventBeforeTxComponsitedEventTest() { + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + sagaEventBus.post(event); + }); + + await().atMost(1, SECONDS).until(() -> { + SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); + if(sagaData != null){ + return sagaData.getLastState() == SagaActorState.COMPENSATED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 3 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED @@ -153,6 +185,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 3 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED @@ -177,6 +211,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 3 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED @@ -201,6 +237,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.SUSPENDED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 3 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED @@ -226,6 +264,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.SUSPENDED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 3 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED @@ -250,6 +290,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMMITTED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 3 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED @@ -274,6 +316,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMMITTED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 3 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED @@ -298,6 +342,8 @@ public class SagaIntegrationTest { SagaData sagaData = LogExtension.LogExtensionProvider.get(system).getSagaData(globalTxId); if(sagaData != null){ return sagaData.getLastState() == SagaActorState.COMPENSATED + && sagaData.getBeginTime() > 0 + && sagaData.getEndTime() >0 && sagaData.getTxEntityMap().size() == 3 && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED diff --git a/alpha/alpha-fsm/src/test/resources/application.yaml b/alpha/alpha-fsm/src/test/resources/application.yaml index b3577a6..9ca5b2e 100644 --- a/alpha/alpha-fsm/src/test/resources/application.yaml +++ b/alpha/alpha-fsm/src/test/resources/application.yaml @@ -15,7 +15,25 @@ ## limitations under the License. ## --------------------------------------------------------------------------- +--- +spring: + profiles: akka-persistence-mem akkaConfig: akka.persistence.journal.plugin: akka.persistence.journal.inmem + akka.persistence.journal.leveldb.dir: target/example/journal akka.persistence.snapshot-store.plugin: akka.persistence.snapshot-store.local - akka.persistence.snapshot-store.local.dir: target/example/snapshots \ No newline at end of file + akka.persistence.snapshot-store.local.dir: target/example/snapshots + +--- +spring: + profiles: akka-persistence-redis +akkaConfig: + akka.persistence.journal.plugin: akka-persistence-redis.journal + akka.persistence.snapshot-store.plugin: akka-persistence-redis.snapshot + akka-persistence-redis: + redis: + mode: simple + host: localhost + port: 6379 + database: 0 + #password: \ No newline at end of file diff --git a/docs/fsm/assets/saga_state_diagram.png b/docs/fsm/assets/saga_state_diagram.png index 349fb72..4bc1ba4 100644 Binary files a/docs/fsm/assets/saga_state_diagram.png and b/docs/fsm/assets/saga_state_diagram.png differ diff --git a/docs/fsm/plantuml/saga-state-diagram.puml b/docs/fsm/plantuml/saga-state-diagram.puml index 429687d..b9233c5 100644 --- a/docs/fsm/plantuml/saga-state-diagram.puml +++ b/docs/fsm/plantuml/saga-state-diagram.puml @@ -33,7 +33,7 @@ FAILED --> COMPENSATED : SagaAbortedEvent<font color=red>:doCompensation</font> FAILED --> SUSPENDED : SagaTimeoutEvent -FAILED --> FAILED : TxComponsitedEvent<font color=blue>:UpdateTxEntity</font>\nTxStartedEvent<font color=blue>:AddTxEntity</font>\nTxEndedEvent<font color=red>:doCompensation</font> +FAILED --> FAILED : TxComponsitedEvent<font color=blue>:UpdateTxEntity</font>\nTxStartedEvent<font color=blue>:AddTxEntity</font>\nTxEndedEvent<font color=red>:doCompensation</font>\nTxComponsitedCheckInternalEvent COMPENSATED --> [*]
