This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 98bc27bba72437e10bdc74d850559058f6441d7a Author: Lei Zhang <[email protected]> AuthorDate: Fri Jun 21 17:07:56 2019 +0800 SCB-1321 Add Alpha FSM prototype code --- alpha/alpha-fsm/pom.xml | 123 ++++ .../servicecomb/pack/alpha/fsm/SagaActor.java | 331 +++++++++++ .../servicecomb/pack/alpha/fsm/SagaActorState.java | 19 + .../apache/servicecomb/pack/alpha/fsm/TxState.java | 8 + .../pack/alpha/fsm/event/SagaAbortedEvent.java | 28 + .../pack/alpha/fsm/event/SagaDomainEvent.java | 14 + .../pack/alpha/fsm/event/SagaEndedEvent.java | 28 + .../pack/alpha/fsm/event/SagaStartedEvent.java | 43 ++ .../pack/alpha/fsm/event/SagaTimeoutEvent.java | 27 + .../pack/alpha/fsm/event/TxAbortedEvent.java | 39 ++ .../pack/alpha/fsm/event/TxComponsitedEvent.java | 38 ++ .../pack/alpha/fsm/event/TxEndedEvent.java | 39 ++ .../pack/alpha/fsm/event/TxStartedEvent.java | 38 ++ .../pack/alpha/fsm/event/base/BaseEvent.java | 26 + .../pack/alpha/fsm/event/base/SagaEvent.java | 14 + .../pack/alpha/fsm/event/base/TxEvent.java | 31 ++ .../servicecomb/pack/alpha/fsm/model/SagaData.java | 131 +++++ .../servicecomb/pack/alpha/fsm/model/TxEntity.java | 95 ++++ .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 619 +++++++++++++++++++++ .../alpha-fsm/src/test/resources/application.conf | 3 + alpha/alpha-fsm/src/test/resources/log4j2.xml | 30 + 21 files changed, 1724 insertions(+) diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml new file mode 100644 index 0000000..faa4e05 --- /dev/null +++ b/alpha/alpha-fsm/pom.xml @@ -0,0 +1,123 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>alpha</artifactId> + <groupId>org.apache.servicecomb.pack</groupId> + <version>0.5.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>alpha-fsm</artifactId> + <name>Pack::Alpha::Fsm</name> + + <properties> + <leveldbjni-all.version>1.8</leveldbjni-all.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-persistence_2.12</artifactId> + <version>${akka.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.servicecomb.pack</groupId> + <artifactId>pack-common</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <!-- log --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>javax.persistence</groupId> + <artifactId>javax.persistence-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <scope>test</scope> + </dependency> + + <!-- akka --> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_2.12</artifactId> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-persistence_2.12</artifactId> + </dependency> + <dependency> + <groupId>org.fusesource.leveldbjni</groupId> + <artifactId>leveldbjni-all</artifactId> + <version>${leveldbjni-all.version}</version> + </dependency> + + <!-- For testing the artifacts scope are test--> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + </dependency> + <dependency> + <groupId>com.github.seanyinx</groupId> + <artifactId>unit-scaffolding</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-testkit_2.12</artifactId> + </dependency> + </dependencies> + +</project> diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java new file mode 100644 index 0000000..492d187 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -0,0 +1,331 @@ +package org.apache.servicecomb.pack.alpha.fsm; + + +import akka.actor.Props; +import akka.persistence.fsm.AbstractPersistentFSM; +import java.lang.invoke.MethodHandles; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaDomainEvent.DomainEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; +import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; +import org.apache.servicecomb.pack.alpha.fsm.model.TxEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +public class SagaActor extends + AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static Props props(String persistenceId) { + return Props.create(SagaActor.class, persistenceId); + } + + private final String persistenceId; + + public SagaActor(String persistenceId) { + this.persistenceId = persistenceId; + + startWith(SagaActorState.IDEL, SagaData.builder().build()); + + when(SagaActorState.IDEL, + matchEvent(SagaStartedEvent.class, + (event, data) -> { + data.setGlobalTxId(event.getGlobalTxId()); + data.setBeginTime(System.currentTimeMillis()); + if (event.getTimeout() > 0) { + data.setExpirationTime(data.getBeginTime() + event.getTimeout() * 1000); + return goTo(SagaActorState.READY) + .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); + } else { + return goTo(SagaActorState.READY); + } + } + + ) + ); + + when(SagaActorState.READY, + matchEvent(TxStartedEvent.class, SagaData.class, + (event, data) -> { + updateTxEntity(event, data); + if (data.getExpirationTime() > 0) { + return goTo(SagaActorState.PARTIALLY_ACTIVE) + .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); + } else { + return goTo(SagaActorState.PARTIALLY_ACTIVE); + } + } + ).event(SagaEndedEvent.class, + (event, data) -> { + return goTo(SagaActorState.SUSPENDED).replying(data); + } + ).event(SagaAbortedEvent.class, + (event, data) -> { + return goTo(SagaActorState.SUSPENDED).replying(data); + } + ).event(Arrays.asList(StateTimeout()), SagaData.class, + (event, data) -> { + return goTo(SagaActorState.SUSPENDED) + .replying(data); + }) + ); + + when(SagaActorState.PARTIALLY_ACTIVE, + matchEvent(TxEndedEvent.class, SagaData.class, + (event, data) -> { + updateTxEntity(event, data); + if (data.getExpirationTime() > 0) { + return goTo(SagaActorState.PARTIALLY_COMMITTED) + .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); + } else { + return goTo(SagaActorState.PARTIALLY_COMMITTED); + } + } + ).event(SagaTimeoutEvent.class, + (event, data) -> { + return goTo(SagaActorState.SUSPENDED) + .replying(data) + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); + } + ).event(TxAbortedEvent.class, + (event, data) -> { + updateTxEntity(event, data); + return goTo(SagaActorState.FAILED); + } + ).event(Arrays.asList(StateTimeout()), SagaData.class, + (event, data) -> { + return goTo(SagaActorState.SUSPENDED).replying(data); + }) + ); + + when(SagaActorState.PARTIALLY_COMMITTED, + matchEvent(TxStartedEvent.class, + (event, data) -> { + updateTxEntity(event, data); + if (data.getExpirationTime() > 0) { + return goTo(SagaActorState.PARTIALLY_ACTIVE) + .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); + } else { + return goTo(SagaActorState.PARTIALLY_ACTIVE); + } + } + ).event(SagaTimeoutEvent.class, + (event, data) -> { + return goTo(SagaActorState.SUSPENDED) + .replying(data) + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); + } + ).event(SagaEndedEvent.class, + (event, data) -> { + data.setEndTime(System.currentTimeMillis()); + return goTo(SagaActorState.COMMITTED) + .replying(data) + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); + } + ).event(SagaAbortedEvent.class, + (event, data) -> { + data.setEndTime(System.currentTimeMillis()); + updateTxEntity(event, data); + return goTo(SagaActorState.FAILED); + } + ).event(TxAbortedEvent.class, + (event, data) -> { + updateTxEntity(event, data); + return goTo(SagaActorState.FAILED); + } + ).event(Arrays.asList(StateTimeout()), SagaData.class, + (event, data) -> { + return goTo(SagaActorState.SUSPENDED).replying(data); + }) + ); + + when(SagaActorState.FAILED, + matchEvent(SagaTimeoutEvent.class, SagaData.class, + (event, data) -> { + data.setEndTime(System.currentTimeMillis()); + return goTo(SagaActorState.SUSPENDED) + .replying(data) + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); + } + ).event(TxComponsitedEvent.class, SagaData.class, + (event, data) -> { + data.setEndTime(System.currentTimeMillis()); + updateTxEntity(event, data); + if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0) + || hasCommittedTx(data)) { + return stay(); + } else { + return goTo(SagaActorState.COMPENSATED) + .replying(data) + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); + } + } + ).event(SagaAbortedEvent.class, SagaData.class, + (event, data) -> { + data.setEndTime(System.currentTimeMillis()); + updateTxEntity(event, data); + data.setTerminated(true); + if ((!data.isTerminated() && data.getCompensationRunningCounter().intValue() > 0) + || hasCommittedTx(data)) { + return stay(); + }else{ + return goTo(SagaActorState.COMPENSATED) + .replying(data) + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); + } + } + ).event(TxStartedEvent.class, SagaData.class, + (event, data) -> { + updateTxEntity(event, data); + return stay(); + } + ).event(TxEndedEvent.class, SagaData.class, + (event, data) -> { + updateTxEntity(event, data); + // TODO 调用补偿方法 + TxEntity txEntity = data.getTxEntityMap().get(event.getLocalTxId()); + compensation(txEntity, data); + return stay(); + } + ).event(Arrays.asList(StateTimeout()), SagaData.class, + (event, data) -> { + return goTo(SagaActorState.SUSPENDED).replying(data); + }) + ); + + when(SagaActorState.COMMITTED, + matchAnyEvent( + (event, data) -> { + return stop(); + } + ) + ); + + when(SagaActorState.SUSPENDED, + matchAnyEvent( + (event, data) -> { + return stop(); + } + ) + ); + + when(SagaActorState.COMPENSATED, + matchAnyEvent( + (event, data) -> { + return stop(); + } + ) + ); + + whenUnhandled( + matchAnyEvent((event, data) -> { + LOG.error("unmatch event {}", event); + return stay(); + }) + ); + + onTransition( + matchState(null, null, (from, to) -> { + LOG.info("transition {} {} -> {}", getSelf(), from, to); + }) + ); + + } + + @Override + public void onRecoveryCompleted() { + LOG.info("onRecoveryCompleted: {} {}", stateName(), stateData()); + } + + @Override + public Class domainEventClass() { + return SagaDomainEvent.DomainEvent.class; + } + + + @Override + public String persistenceId() { + return persistenceId; + } + + @Override + public SagaData applyEvent(DomainEvent domainEvent, SagaData currentData) { + return currentData; + } + + private void updateTxEntity(BaseEvent event, SagaData data) { + if (event instanceof TxEvent) { + TxEvent txEvent = (TxEvent) event; + if (!data.getTxEntityMap().containsKey(txEvent.getLocalTxId())) { + if (event instanceof TxStartedEvent) { + TxEntity txEntity = TxEntity.builder() + .localTxId(txEvent.getLocalTxId()) + .parentTxId(txEvent.getParentTxId()) + .state(TxState.ACTIVE) + .build(); + data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity); + } + } else { + TxEntity txEntity = data.getTxEntityMap().get(txEvent.getLocalTxId()); + if (event instanceof TxEndedEvent) { + if (txEntity.getState() == TxState.ACTIVE) { + txEntity.setEndTime(System.currentTimeMillis()); + txEntity.setState(TxState.COMMITTED); + } + } else if (event instanceof TxAbortedEvent) { + if (txEntity.getState() == TxState.ACTIVE) { + txEntity.setEndTime(System.currentTimeMillis()); + txEntity.setState(TxState.FAILED); + // TODO 调用补偿方法 + data.getTxEntityMap().forEach((k, v) -> { + if (v.getState() == TxState.COMMITTED) { + // TODO 调用补偿方法 + compensation(v, data); + } + }); + } + } else if (event instanceof TxComponsitedEvent) { + //补偿中计数器减一 + data.getCompensationRunningCounter().decrementAndGet(); + txEntity.setState(TxState.COMPENSATED); + LOG.info("完成补偿 {}",txEntity.getLocalTxId()); + } + } + } else if (event instanceof SagaEvent) { + if (event instanceof SagaAbortedEvent) { + data.getTxEntityMap().forEach((k, v) -> { + if (v.getState() == TxState.COMMITTED) { + // TODO 调用补偿方法 + compensation(v, data); + } + }); + } + } + } + + private boolean hasCommittedTx(SagaData data) { + return data.getTxEntityMap().entrySet().stream() + .filter(map -> map.getValue().getState() == TxState.COMMITTED) + .count() > 0; + } + + private void compensation(TxEntity txEntity, SagaData data) { + //补偿中计数器加一 + data.getCompensationRunningCounter().incrementAndGet(); + LOG.info("调用补偿方法 {}", txEntity.getLocalTxId()); + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java new file mode 100644 index 0000000..0c7be27 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorState.java @@ -0,0 +1,19 @@ +package org.apache.servicecomb.pack.alpha.fsm; + +import akka.persistence.fsm.PersistentFSM; + +public enum SagaActorState implements PersistentFSM.FSMState { + IDEL, + READY, + PARTIALLY_ACTIVE, + PARTIALLY_COMMITTED, + FAILED, + COMMITTED, + COMPENSATED, + SUSPENDED; + + @Override + public String identifier() { + return name(); + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java new file mode 100644 index 0000000..e9c54e4 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/TxState.java @@ -0,0 +1,8 @@ +package org.apache.servicecomb.pack.alpha.fsm; + +public enum TxState { + ACTIVE, + FAILED, + COMMITTED, + COMPENSATED; +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java new file mode 100644 index 0000000..351146b --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java @@ -0,0 +1,28 @@ +package org.apache.servicecomb.pack.alpha.fsm.event; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; + +public class SagaAbortedEvent extends SagaEvent { + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private SagaAbortedEvent sagaAbortedEvent; + + private Builder() { + sagaAbortedEvent = new SagaAbortedEvent(); + } + + public Builder globalTxId(String globalTxId) { + sagaAbortedEvent.setGlobalTxId(globalTxId); + return this; + } + + public SagaAbortedEvent build() { + return sagaAbortedEvent; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java new file mode 100644 index 0000000..b421c73 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaDomainEvent.java @@ -0,0 +1,14 @@ +package org.apache.servicecomb.pack.alpha.fsm.event; + +public class SagaDomainEvent { + public interface DomainEvent {} + + public enum SagaStartedEvent implements DomainEvent {INSTANCE} + public enum SagaEndedEvent implements DomainEvent {INSTANCE} + public enum SagaAbortedEvent implements DomainEvent {INSTANCE} + public enum SagaTimeoutEvent implements DomainEvent {INSTANCE} + public enum TxStartedEvent implements DomainEvent {INSTANCE} + public enum TxEndedEvent implements DomainEvent {INSTANCE} + public enum TxAbortedEvent implements DomainEvent {INSTANCE} + public enum TxComponsitedEvent implements DomainEvent {INSTANCE} +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java new file mode 100644 index 0000000..7a87d77 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java @@ -0,0 +1,28 @@ +package org.apache.servicecomb.pack.alpha.fsm.event; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; + +public class SagaEndedEvent extends SagaEvent { + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private SagaEndedEvent sagaEndedEvent; + + private Builder() { + sagaEndedEvent = new SagaEndedEvent(); + } + + public Builder globalTxId(String globalTxId) { + sagaEndedEvent.setGlobalTxId(globalTxId); + return this; + } + + public SagaEndedEvent build() { + return sagaEndedEvent; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java new file mode 100644 index 0000000..4831be7 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java @@ -0,0 +1,43 @@ +package org.apache.servicecomb.pack.alpha.fsm.event; + + +import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; + +public class SagaStartedEvent extends SagaEvent { + private int timeout; //second + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private SagaStartedEvent sagaStartedEvent; + + private Builder() { + sagaStartedEvent = new SagaStartedEvent(); + } + + public Builder globalTxId(String globalTxId) { + sagaStartedEvent.setGlobalTxId(globalTxId); + return this; + } + + public Builder timeout(int timeout) { + sagaStartedEvent.setTimeout(timeout); + return this; + } + + public SagaStartedEvent build() { + return sagaStartedEvent; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java new file mode 100644 index 0000000..4de5c37 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java @@ -0,0 +1,27 @@ +package org.apache.servicecomb.pack.alpha.fsm.event; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; + +public class SagaTimeoutEvent extends SagaEvent { + + public static Builder builder() { + return new Builder(); + } + public static final class Builder { + + private SagaTimeoutEvent sagaTimeoutEvent; + + private Builder() { + sagaTimeoutEvent = new SagaTimeoutEvent(); + } + + public Builder globalTxId(String globalTxId) { + sagaTimeoutEvent.setGlobalTxId(globalTxId); + return this; + } + + public SagaTimeoutEvent build() { + return sagaTimeoutEvent; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java new file mode 100644 index 0000000..620366f --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java @@ -0,0 +1,39 @@ +package org.apache.servicecomb.pack.alpha.fsm.event; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; + +public class TxAbortedEvent extends TxEvent { + + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private TxAbortedEvent txAbortedEvent; + + private Builder() { + txAbortedEvent = new TxAbortedEvent(); + } + + public Builder parentTxId(String parentTxId) { + txAbortedEvent.setParentTxId(parentTxId); + return this; + } + + public Builder localTxId(String localTxId) { + txAbortedEvent.setLocalTxId(localTxId); + return this; + } + + public Builder globalTxId(String globalTxId) { + txAbortedEvent.setGlobalTxId(globalTxId); + return this; + } + + public TxAbortedEvent build() { + return txAbortedEvent; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java new file mode 100644 index 0000000..cccb69a --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxComponsitedEvent.java @@ -0,0 +1,38 @@ +package org.apache.servicecomb.pack.alpha.fsm.event; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; + +public class TxComponsitedEvent extends TxEvent { + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private TxComponsitedEvent txComponsitedEvent; + + private Builder() { + txComponsitedEvent = new TxComponsitedEvent(); + } + + public Builder parentTxId(String parentTxId) { + txComponsitedEvent.setParentTxId(parentTxId); + return this; + } + + public Builder localTxId(String localTxId) { + txComponsitedEvent.setLocalTxId(localTxId); + return this; + } + + public Builder globalTxId(String globalTxId) { + txComponsitedEvent.setGlobalTxId(globalTxId); + return this; + } + + public TxComponsitedEvent build() { + return txComponsitedEvent; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java new file mode 100644 index 0000000..33ea3d1 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java @@ -0,0 +1,39 @@ +package org.apache.servicecomb.pack.alpha.fsm.event; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; + +public class TxEndedEvent extends TxEvent { + + public static Builder builder() { + return new Builder(); + } + + + public static final class Builder { + + private TxEndedEvent txEndedEvent; + + private Builder() { + txEndedEvent = new TxEndedEvent(); + } + + public Builder parentTxId(String parentTxId) { + txEndedEvent.setParentTxId(parentTxId); + return this; + } + + public Builder localTxId(String localTxId) { + txEndedEvent.setLocalTxId(localTxId); + return this; + } + + public Builder globalTxId(String globalTxId) { + txEndedEvent.setGlobalTxId(globalTxId); + return this; + } + + public TxEndedEvent build() { + return txEndedEvent; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java new file mode 100644 index 0000000..2d10462 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java @@ -0,0 +1,38 @@ +package org.apache.servicecomb.pack.alpha.fsm.event; + +import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; + +public class TxStartedEvent extends TxEvent { + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private TxStartedEvent txStartedEvent; + + private Builder() { + txStartedEvent = new TxStartedEvent(); + } + + public Builder parentTxId(String parentTxId) { + txStartedEvent.setParentTxId(parentTxId); + return this; + } + + public Builder localTxId(String localTxId) { + txStartedEvent.setLocalTxId(localTxId); + return this; + } + + public Builder globalTxId(String globalTxId) { + txStartedEvent.setGlobalTxId(globalTxId); + return this; + } + + public TxStartedEvent build() { + return txStartedEvent; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java new file mode 100644 index 0000000..ea8257d --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java @@ -0,0 +1,26 @@ +package org.apache.servicecomb.pack.alpha.fsm.event.base; + +import java.io.Serializable; + +public abstract class BaseEvent implements Serializable { + private String globalTxId; + + public BaseEvent() { + + } + + public String getGlobalTxId() { + return globalTxId; + } + + public void setGlobalTxId(String globalTxId) { + this.globalTxId = globalTxId; + } + + @Override + public String toString() { + return "BaseEvent{" + + "globalTxId='" + globalTxId + '\'' + + '}'; + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java new file mode 100644 index 0000000..bf39855 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java @@ -0,0 +1,14 @@ +package org.apache.servicecomb.pack.alpha.fsm.event.base; + +public class SagaEvent extends BaseEvent { + + public SagaEvent() { + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "{" + + "globalTxId='" + this.getGlobalTxId() + '\'' + + '}'; + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java new file mode 100644 index 0000000..56a06e0 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java @@ -0,0 +1,31 @@ +package org.apache.servicecomb.pack.alpha.fsm.event.base; + +public abstract class TxEvent extends BaseEvent { + private String parentTxId; + private String localTxId; + + public String getParentTxId() { + return parentTxId; + } + + public void setParentTxId(String parentTxId) { + this.parentTxId = parentTxId; + } + + public String getLocalTxId() { + return localTxId; + } + + public void setLocalTxId(String localTxId) { + this.localTxId = localTxId; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "{" + + "globalTxId='" + this.getGlobalTxId() + '\'' + + "parentTxId='" + parentTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + '}'; + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java new file mode 100644 index 0000000..0cc858a --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java @@ -0,0 +1,131 @@ +package org.apache.servicecomb.pack.alpha.fsm.model; + + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +public class SagaData implements Serializable { + private long beginTime = System.currentTimeMillis(); + private long endTime; + private String globalTxId; + private long expirationTime; + private boolean terminated; + private AtomicLong compensationRunningCounter = new AtomicLong(); + private Map<String,TxEntity> txEntityMap = new HashMap<>(); + + public long getBeginTime() { + return beginTime; + } + + public void setBeginTime(long beginTime) { + this.beginTime = beginTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public String getGlobalTxId() { + return globalTxId; + } + + public void setGlobalTxId(String globalTxId) { + this.globalTxId = globalTxId; + } + + public long getExpirationTime() { + return expirationTime; + } + + public void setExpirationTime(long expirationTime) { + this.expirationTime = expirationTime; + } + + public boolean isTerminated() { + return terminated; + } + + public void setTerminated(boolean terminated) { + this.terminated = terminated; + } + + public AtomicLong getCompensationRunningCounter() { + return compensationRunningCounter; + } + + public void setCompensationRunningCounter( + AtomicLong compensationRunningCounter) { + this.compensationRunningCounter = compensationRunningCounter; + } + + public Map<String, TxEntity> getTxEntityMap() { + return txEntityMap; + } + + public void setTxEntityMap( + Map<String, TxEntity> txEntityMap) { + this.txEntityMap = txEntityMap; + } + + public long getTimeout(){ + return expirationTime-System.currentTimeMillis(); + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private SagaData sagaData; + + private Builder() { + sagaData = new SagaData(); + } + + public Builder beginTime(long beginTime) { + sagaData.setBeginTime(beginTime); + return this; + } + + public Builder endTime(long endTime) { + sagaData.setEndTime(endTime); + return this; + } + + public Builder globalTxId(String globalTxId) { + sagaData.setGlobalTxId(globalTxId); + return this; + } + + public Builder expirationTime(long expirationTime) { + sagaData.setExpirationTime(expirationTime); + return this; + } + + public Builder terminated(boolean terminated) { + sagaData.setTerminated(terminated); + return this; + } + + public Builder compensationRunningCounter(AtomicLong compensationRunningCounter) { + sagaData.setCompensationRunningCounter(compensationRunningCounter); + return this; + } + + public Builder txEntityMap(Map<String, TxEntity> txEntityMap) { + sagaData.setTxEntityMap(txEntityMap); + return this; + } + + public SagaData build() { + return sagaData; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java new file mode 100644 index 0000000..f6a3222 --- /dev/null +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/TxEntity.java @@ -0,0 +1,95 @@ +package org.apache.servicecomb.pack.alpha.fsm.model; + + +import java.io.Serializable; +import org.apache.servicecomb.pack.alpha.fsm.TxState; + +public class TxEntity implements Serializable { + private long beginTime = System.currentTimeMillis(); + private long endTime; + private String parentTxId; + private String localTxId; + private TxState state; + + public long getBeginTime() { + return beginTime; + } + + public void setBeginTime(long beginTime) { + this.beginTime = beginTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public String getParentTxId() { + return parentTxId; + } + + public void setParentTxId(String parentTxId) { + this.parentTxId = parentTxId; + } + + public String getLocalTxId() { + return localTxId; + } + + public void setLocalTxId(String localTxId) { + this.localTxId = localTxId; + } + + public TxState getState() { + return state; + } + + public void setState(TxState state) { + this.state = state; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private TxEntity txEntity; + + private Builder() { + txEntity = new TxEntity(); + } + + public Builder beginTime(long beginTime) { + txEntity.setBeginTime(beginTime); + return this; + } + + public Builder endTime(long endTime) { + txEntity.setEndTime(endTime); + return this; + } + + public Builder parentTxId(String parentTxId) { + txEntity.setParentTxId(parentTxId); + return this; + } + + public Builder localTxId(String localTxId) { + txEntity.setLocalTxId(localTxId); + return this; + } + + public Builder state(TxState state) { + txEntity.setState(state); + return this; + } + + public TxEntity build() { + return txEntity; + } + } +} diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java new file mode 100644 index 0000000..156d67e --- /dev/null +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -0,0 +1,619 @@ +package org.apache.servicecomb.pack.alpha.fsm; + +import static org.junit.Assert.assertEquals; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Terminated; +import akka.persistence.fsm.PersistentFSM; +import akka.persistence.fsm.PersistentFSM.CurrentState; +import akka.testkit.javadsl.TestKit; +import java.time.Duration; +import java.util.UUID; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaAbortedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaEndedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaTimeoutEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxComponsitedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent; +import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class SagaActorTest { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("SagaActorTest"); + } + + @AfterClass + public static void tearDown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + public String genPersistenceId() { + return UUID.randomUUID().toString(); + } + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxEndedEvent-12 + * 4. TxStartedEvent-13 + * 5. TxEndedEvent-13 + * 6. SagaEndedEvent-1 + */ + @Test + public void successfulTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + sagaData.getTxEntityMap().forEach((k, v) -> { + assertEquals(v.getState(), TxState.COMMITTED); + }); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxAbortedEvent-11 + * 7. SagaAbortedEvent-1 + */ + @Test + public void firstTxAbortedEventTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 1); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxAbortedEvent-12 + * 6. TxComponsitedEvent-11 + * 7. SagaAbortedEvent-1 + */ + @Test + public void middleTxAbortedEventTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 2); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxEndedEvent-12 + * 6. TxStartedEvent-13 + * 7. TxAbortedEvent-13 + * 8. TxComponsitedEvent-11 + * 9. TxComponsitedEvent-12 + * 10. SagaAbortedEvent-1 + */ + @Test + public void lastTxAbortedEventTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxAbortedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxEndedEvent-12 + * 6. TxStartedEvent-13 + * 7. TxEndedEvent-13 + * 8. TxComponsitedEvent-12 + * 9. TxComponsitedEvent-13 + * 10. SagaAbortedEvent-1 + */ + @Test + public void receivedRemainingEventAfterfirstTxAbortedEventTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxEndedEvent-12 + * 6. TxStartedEvent-13 + * 7. TxEndedEvent-13 + * 8. SagaAbortedEvent-1 + * 9. TxComponsitedEvent-11 + * 8. TxComponsitedEvent-12 + * 9. TxComponsitedEvent-13 + */ + @Test + public void sagaAbortedEventAfterAllTxEndedTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(SagaAbortedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxComponsitedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + + /** + * 1. SagaStartedEvent-1 + * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxEndedEvent-12 + * 4. TxStartedEvent-13 + * 5. TxEndedEvent-13 + * 5. SagaTimeoutEvent-1 + */ + @Test + public void omegaSendSagaTimeoutEventTest() { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(SagaTimeoutEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + SagaData sagaData = expectMsgClass(SagaData.class); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + /** + * 1. SagaStartedEvent(5s)-1 + * 2. TxStartedEvent-11 + * 3. TxEndedEvent-11 + * 4. TxStartedEvent-12 + * 5. TxEndedEvent-12 + * 4. TxStartedEvent-13 + * 5. TxEndedEvent-13 + * 6. SagaEndedEvent-1 + */ + @Test + public void sagaActorTriggerTimeoutTest() throws InterruptedException { + new TestKit(system) {{ + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + final int timeout = 5; + + ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId()), "saga"); + watch(saga); + saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); + saga.tell(SagaStartedEvent.builder().globalTxId(globalTxId).timeout(timeout).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build(), getRef()); + saga.tell(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + saga.tell(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build(), getRef()); + Thread.sleep(timeout*2000); + saga.tell(SagaEndedEvent.builder().globalTxId(globalTxId).build(), getRef()); + + //expect + CurrentState currentState = expectMsgClass(PersistentFSM.CurrentState.class); + assertEquals(SagaActorState.IDEL, currentState.state()); + PersistentFSM.Transition transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.IDEL, SagaActorState.READY); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.READY, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.PARTIALLY_ACTIVE); + + transition = expectMsgClass(PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); + + transition = expectMsgClass(Duration.ofSeconds(timeout+2),PersistentFSM.Transition.class); + assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED); + + Terminated terminated = expectMsgClass(Terminated.class); + assertEquals(terminated.getActor(), saga); + + system.stop(saga); + }}; + } + + private static void assertSagaTransition(PersistentFSM.Transition transition, ActorRef actorRef, + SagaActorState from, SagaActorState to) { + assertEquals(transition.fsmRef(), actorRef); + assertEquals(transition.from(), from); + assertEquals(transition.to(), to); + } + +} diff --git a/alpha/alpha-fsm/src/test/resources/application.conf b/alpha/alpha-fsm/src/test/resources/application.conf new file mode 100644 index 0000000..1265036 --- /dev/null +++ b/alpha/alpha-fsm/src/test/resources/application.conf @@ -0,0 +1,3 @@ +akka.persistence.journal.plugin = "akka.persistence.journal.inmem" +akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" +akka.persistence.snapshot-store.local.dir = "target/example/snapshots" \ No newline at end of file diff --git a/alpha/alpha-fsm/src/test/resources/log4j2.xml b/alpha/alpha-fsm/src/test/resources/log4j2.xml new file mode 100644 index 0000000..58924c6 --- /dev/null +++ b/alpha/alpha-fsm/src/test/resources/log4j2.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration>
