This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 4f49d1786148cc201ea9fd3362a9e2f1374648e2 Author: Lei Zhang <[email protected]> AuthorDate: Mon Jul 8 19:58:34 2019 +0800 SCB-1321 SagaActor Refactoring SagaActor Persistent & Recover SagaActor call compensation --- .../pack/alpha/core/fsm/PackSagaEvent.java | 150 +++++++++++ .../pack/alpha/fsm/FsmAutoConfiguration.java | 2 +- .../servicecomb/pack/alpha/fsm/SagaActor.java | 130 ++++------ .../pack/alpha/fsm/domain/AddTxEventDomain.java | 61 +---- .../pack/alpha/fsm/domain/DomainEvent.java | 3 +- .../pack/alpha/fsm/domain/SagaEndedDomain.java | 12 +- .../pack/alpha/fsm/domain/SagaStartedDomain.java | 35 +-- .../pack/alpha/fsm/domain/UpdateTxEventDomain.java | 36 ++- .../pack/alpha/fsm/event/SagaAbortedEvent.java | 16 ++ .../pack/alpha/fsm/event/SagaEndedEvent.java | 18 ++ .../pack/alpha/fsm/event/SagaStartedEvent.java | 25 ++ .../pack/alpha/fsm/event/SagaTimeoutEvent.java | 16 ++ .../pack/alpha/fsm/event/TxAbortedEvent.java | 11 + .../pack/alpha/fsm/event/TxCompensatedEvent.java | 11 + .../pack/alpha/fsm/event/TxEndedEvent.java | 11 + .../pack/alpha/fsm/event/TxStartedEvent.java | 36 --- .../pack/alpha/fsm/event/base/BaseEvent.java | 55 ++++- .../pack/alpha/fsm/event/base/SagaEvent.java | 9 - .../pack/alpha/fsm/event/base/TxEvent.java | 44 ---- .../fsm/event/consumer/SagaEventConsumer.java | 7 - .../servicecomb/pack/alpha/fsm/model/SagaData.java | 84 ++++++- .../spring/integration/akka/SagaDataExtension.java | 13 +- .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 77 ++++-- .../pack/alpha/fsm/SagaEventSender.java | 218 ++++++++-------- .../pack/alpha/fsm/SagaIntegrationTest.java | 275 +++++++++------------ 25 files changed, 801 insertions(+), 554 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/PackSagaEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/PackSagaEvent.java new file mode 100644 index 0000000..1588c55 --- /dev/null +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/pack/alpha/core/fsm/PackSagaEvent.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.pack.alpha.core.fsm; + +import java.util.Date; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.persistence.Transient; + +@Entity +@Table(name = "PACK_SAGA_EVENT") +public class PackSagaEvent { + @Transient + public static final long MAX_TIMESTAMP = 253402214400000L; // 9999-12-31 00:00:00 + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long surrogateId; + + private String serviceName; + private String instanceId; + private Date creationTime; + private String globalTxId; + private String localTxId; + private String parentTxId; + private String type; + private String compensationMethod; + private Date expiryTime; + private String retryMethod; + private int retries; + private byte[] payloads; + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private String serviceName; + private String instanceId; + private Date creationTime; + private String globalTxId; + private String localTxId; + private String parentTxId; + private String type; + private String compensationMethod; + private Date expiryTime; + private String retryMethod; + private int retries; + private byte[] payloads; + + private Builder() { + } + + public Builder serviceName(String serviceName) { + this.serviceName = serviceName; + return this; + } + + public Builder instanceId(String instanceId) { + this.instanceId = instanceId; + return this; + } + + public Builder creationTime(Date creationTime) { + this.creationTime = creationTime; + return this; + } + + public Builder globalTxId(String globalTxId) { + this.globalTxId = globalTxId; + return this; + } + + public Builder localTxId(String localTxId) { + this.localTxId = localTxId; + return this; + } + + public Builder parentTxId(String parentTxId) { + this.parentTxId = parentTxId; + return this; + } + + public Builder type(String type) { + this.type = type; + return this; + } + + public Builder compensationMethod(String compensationMethod) { + this.compensationMethod = compensationMethod; + return this; + } + + public Builder expiryTime(Date expiryTime) { + this.expiryTime = expiryTime; + return this; + } + + public Builder retryMethod(String retryMethod) { + this.retryMethod = retryMethod; + return this; + } + + public Builder retries(int retries) { + this.retries = retries; + return this; + } + + public Builder payloads(byte[] payloads) { + this.payloads = payloads; + return this; + } + + public PackSagaEvent build() { + PackSagaEvent packSagaEvent = new PackSagaEvent(); + packSagaEvent.instanceId = this.instanceId; + packSagaEvent.serviceName = this.serviceName; + packSagaEvent.localTxId = this.localTxId; + packSagaEvent.retryMethod = this.retryMethod; + packSagaEvent.creationTime = this.creationTime; + packSagaEvent.compensationMethod = this.compensationMethod; + packSagaEvent.payloads = this.payloads; + packSagaEvent.globalTxId = this.globalTxId; + packSagaEvent.retries = this.retries; + packSagaEvent.type = this.type; + packSagaEvent.parentTxId = this.parentTxId; + packSagaEvent.expiryTime = this.expiryTime; + return packSagaEvent; + } + } +} diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java index c92804d..5371159 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java @@ -34,7 +34,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.core.env.ConfigurableEnvironment; @Configuration -@ConditionalOnProperty(value = {"alpha.model.actor.enabled"}) +@ConditionalOnProperty(value = {"alpha.feature.akka.enabled"}) public class FsmAutoConfiguration { @Bean diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index 712a5f4..188f156 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -21,6 +21,7 @@ import akka.actor.Props; import akka.persistence.fsm.AbstractPersistentFSM; import java.lang.invoke.MethodHandles; import java.util.Arrays; +import java.util.Date; import java.util.concurrent.TimeUnit; import org.apache.servicecomb.pack.alpha.core.AlphaException; import org.apache.servicecomb.pack.alpha.fsm.domain.AddTxEventDomain; @@ -65,12 +66,11 @@ public class SagaActor extends when(SagaActorState.IDEL, matchEvent(SagaStartedEvent.class, (event, data) -> { - SagaStartedDomain domainEvent = new SagaStartedDomain(event.getGlobalTxId(), - event.getCreateTime(), event.getTimeout()); + SagaStartedDomain domainEvent = new SagaStartedDomain(event); if (event.getTimeout() > 0) { return goTo(SagaActorState.READY) .applying(domainEvent) - .forMax(Duration.create(event.getTimeout(), TimeUnit.MILLISECONDS)); + .forMax(Duration.create(event.getTimeout(), TimeUnit.SECONDS)); } else { return goTo(SagaActorState.READY) .applying(domainEvent); @@ -83,15 +83,8 @@ public class SagaActor extends when(SagaActorState.READY, matchEvent(TxStartedEvent.class, SagaData.class, (event, data) -> { - AddTxEventDomain domainEvent = new AddTxEventDomain( - event.getServiceName(), - event.getInstanceId(), - event.getParentTxId(), - event.getLocalTxId(), - event.getPayloads(), - event.getCompensationMethod(), - event.getRetries()); - if (data.getExpirationTime() > 0) { + AddTxEventDomain domainEvent = new AddTxEventDomain(event); + if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_ACTIVE) .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); @@ -102,21 +95,21 @@ public class SagaActor extends } ).event(SagaEndedEvent.class, (event, data) -> { - SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) .replying(data); } ).event(SagaAbortedEvent.class, (event, data) -> { - SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) .replying(data); } ).event(Arrays.asList(StateTimeout()), SagaData.class, (event, data) -> { - SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); + SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) .replying(data); @@ -126,9 +119,8 @@ public class SagaActor extends when(SagaActorState.PARTIALLY_ACTIVE, matchEvent(TxEndedEvent.class, SagaData.class, (event, data) -> { - UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.COMMITTED, new byte[0]); - if (data.getExpirationTime() > 0) { + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); + if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_COMMITTED) .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); @@ -139,15 +131,8 @@ public class SagaActor extends } ).event(TxStartedEvent.class, (event, data) -> { - AddTxEventDomain domainEvent = new AddTxEventDomain( - event.getServiceName(), - event.getInstanceId(), - event.getParentTxId(), - event.getLocalTxId(), - event.getPayloads(), - event.getCompensationMethod(), - event.getRetries()); - if (data.getExpirationTime() > 0) { + AddTxEventDomain domainEvent = new AddTxEventDomain(event); + if (data.getExpirationTime() != null) { return stay() .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); @@ -157,7 +142,7 @@ public class SagaActor extends } ).event(SagaTimeoutEvent.class, (event, data) -> { - SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) .replying(data) @@ -165,8 +150,7 @@ public class SagaActor extends } ).event(TxAbortedEvent.class, (event, data) -> { - UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.FAILED, event.getPayloads()); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return goTo(SagaActorState.FAILED) .applying(domainEvent); } @@ -179,15 +163,8 @@ public class SagaActor extends when(SagaActorState.PARTIALLY_COMMITTED, matchEvent(TxStartedEvent.class, (event, data) -> { - AddTxEventDomain domainEvent = new AddTxEventDomain( - event.getServiceName(), - event.getInstanceId(), - event.getParentTxId(), - event.getLocalTxId(), - event.getPayloads(), - event.getCompensationMethod(), - event.getRetries()); - if (data.getExpirationTime() > 0) { + AddTxEventDomain domainEvent = new AddTxEventDomain(event); + if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_ACTIVE) .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); @@ -198,9 +175,8 @@ public class SagaActor extends } ).event(TxEndedEvent.class, (event, data) -> { - UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.COMMITTED, new byte[0]); - if (data.getExpirationTime() > 0) { + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); + if (data.getExpirationTime() != null) { return stay() .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); @@ -210,7 +186,7 @@ public class SagaActor extends } ).event(SagaTimeoutEvent.class, (event, data) -> { - SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) .replying(data) @@ -218,7 +194,7 @@ public class SagaActor extends } ).event(SagaEndedEvent.class, (event, data) -> { - SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.COMMITTED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED); return goTo(SagaActorState.COMMITTED) .applying(domainEvent) .replying(data) @@ -226,13 +202,12 @@ public class SagaActor extends } ).event(SagaAbortedEvent.class, (event, data) -> { - SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED); return goTo(SagaActorState.FAILED).applying(domainEvent); } ).event(TxAbortedEvent.class, (event, data) -> { - UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.FAILED, event.getPayloads()); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return goTo(SagaActorState.FAILED).applying(domainEvent); } ).event(Arrays.asList(StateTimeout()), SagaData.class, @@ -244,7 +219,7 @@ public class SagaActor extends when(SagaActorState.FAILED, matchEvent(SagaTimeoutEvent.class, SagaData.class, (event, data) -> { - SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) .replying(data) @@ -252,8 +227,7 @@ public class SagaActor extends } ).event(TxCompensatedEvent.class, SagaData.class, (event, data) -> { - UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.COMPENSATED, new byte[0]); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return stay().applying(domainEvent).andThen(exec(_data -> { self().tell(TxComponsitedCheckInternalEvent.builder().build(), self()); })); @@ -261,11 +235,9 @@ public class SagaActor extends ).event(TxComponsitedCheckInternalEvent.class, SagaData.class, (event, data) -> { if (hasCompensationSentTx(data) || !data.isTerminated()) { - //if (hasCompensationSentTx(data)) { return stay().replying(data); } else { - SagaEndedDomain domainEvent = new SagaEndedDomain( - SagaActorState.COMPENSATED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED); return goTo(SagaActorState.COMPENSATED) .applying(domainEvent) .replying(data) @@ -276,13 +248,13 @@ public class SagaActor extends (event, data) -> { data.setTerminated(true); if (hasCommittedTx(data)) { - SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.FAILED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED); return stay().replying(data).applying(domainEvent); } else if (hasCompensationSentTx(data)) { - return stay().replying(data); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED); + return stay().replying(data).applying(domainEvent); } else { - SagaEndedDomain domainEvent = new SagaEndedDomain( - SagaActorState.COMPENSATED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED); return goTo(SagaActorState.COMPENSATED) .applying(domainEvent) .replying(data) @@ -291,20 +263,12 @@ public class SagaActor extends } ).event(TxStartedEvent.class, SagaData.class, (event, data) -> { - AddTxEventDomain domainEvent = new AddTxEventDomain( - event.getServiceName(), - event.getInstanceId(), - event.getParentTxId(), - event.getLocalTxId(), - event.getPayloads(), - event.getCompensationMethod(), - event.getRetries()); + AddTxEventDomain domainEvent = new AddTxEventDomain(event); return stay().applying(domainEvent); } ).event(TxEndedEvent.class, SagaData.class, (event, data) -> { - UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event.getParentTxId(), - event.getLocalTxId(), TxState.COMMITTED, new byte[0]); + UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); return stay().applying(domainEvent).andThen(exec(_data -> { TxEntity txEntity = _data.getTxEntityMap().get(event.getLocalTxId()); // call compensate @@ -379,7 +343,7 @@ public class SagaActor extends LOG.info("stop {} {}", data.getGlobalTxId(), state); data.setTerminated(true); data.setLastState(state); - data.setEndTime(System.currentTimeMillis()); + data.setEndTime(new Date()); SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()) .putSagaData(data.getGlobalTxId(), data); } @@ -390,27 +354,33 @@ public class SagaActor extends @Override public SagaData applyEvent(DomainEvent event, SagaData data) { + // log event to SagaData + if(event.getEvent() != null && !(event.getEvent() instanceof TxComponsitedCheckInternalEvent)){ + data.logEvent(event.getEvent()); + } if (event instanceof SagaStartedDomain) { SagaStartedDomain domainEvent = (SagaStartedDomain) event; - data.setGlobalTxId(domainEvent.getGlobalTxId()); - data.setBeginTime(domainEvent.getCreateTime()); + data.setServiceName(domainEvent.getEvent().getServiceName()); + data.setInstanceId(domainEvent.getEvent().getInstanceId()); + data.setGlobalTxId(domainEvent.getEvent().getGlobalTxId()); + data.setBeginTime(domainEvent.getEvent().getCreateTime()); data.setExpirationTime(domainEvent.getExpirationTime()); } else if (event instanceof AddTxEventDomain) { AddTxEventDomain domainEvent = (AddTxEventDomain) event; - if (!data.getTxEntityMap().containsKey(domainEvent.getLocalTxId())) { + if (!data.getTxEntityMap().containsKey(domainEvent.getEvent().getLocalTxId())) { TxEntity txEntity = TxEntity.builder() - .serviceName(domainEvent.getServiceName()) - .instanceId(domainEvent.getInstanceId()) - .globalTxId(data.getGlobalTxId()) - .localTxId(domainEvent.getLocalTxId()) - .parentTxId(domainEvent.getParentTxId()) + .serviceName(domainEvent.getEvent().getServiceName()) + .instanceId(domainEvent.getEvent().getInstanceId()) + .globalTxId(domainEvent.getEvent().getGlobalTxId()) + .localTxId(domainEvent.getEvent().getLocalTxId()) + .parentTxId(domainEvent.getEvent().getParentTxId()) .compensationMethod(domainEvent.getCompensationMethod()) .payloads(domainEvent.getPayloads()) .state(domainEvent.getState()) .build(); data.getTxEntityMap().put(txEntity.getLocalTxId(), txEntity); } else { - LOG.warn("TxEntity {} already exists", domainEvent.getLocalTxId()); + LOG.warn("TxEntity {} already exists", domainEvent.getEvent().getLocalTxId()); } } else if (event instanceof UpdateTxEventDomain) { UpdateTxEventDomain domainEvent = (UpdateTxEventDomain) event; @@ -447,13 +417,13 @@ public class SagaActor extends } }); } else if (domainEvent.getState() == SagaActorState.SUSPENDED) { - data.setEndTime(System.currentTimeMillis()); + data.setEndTime(new Date()); data.setTerminated(true); } else if (domainEvent.getState() == SagaActorState.COMPENSATED) { - data.setEndTime(System.currentTimeMillis()); + data.setEndTime(new Date()); data.setTerminated(true); } else if (domainEvent.getState() == SagaActorState.COMMITTED) { - data.setEndTime(System.currentTimeMillis()); + data.setEndTime(new Date()); data.setTerminated(true); } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java index 520b7db..0e6b533 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/AddTxEventDomain.java @@ -18,67 +18,27 @@ package org.apache.servicecomb.pack.alpha.fsm.domain; import org.apache.servicecomb.pack.alpha.fsm.TxState; +import org.apache.servicecomb.pack.alpha.fsm.event.TxStartedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; public class AddTxEventDomain implements DomainEvent { - private String serviceName; - private String instanceId; - private String parentTxId; - private String localTxId; private TxState state = TxState.ACTIVE; private int retries; private String compensationMethod; private byte[] payloads; + private BaseEvent event; - public AddTxEventDomain(String serviceName, String instanceId, String parentTxId, String localTxId, byte[] payloads, String compensationMethod, int retries) { - this.serviceName = serviceName; - this.instanceId = instanceId; - this.parentTxId = parentTxId; - this.localTxId = localTxId; - this.compensationMethod = compensationMethod; - this.payloads = payloads; - this.retries = retries; - } - - public String getServiceName() { - return serviceName; - } - - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - - public String getInstanceId() { - return instanceId; - } - - public void setInstanceId(String instanceId) { - this.instanceId = instanceId; - } - - public String getParentTxId() { - return parentTxId; - } - - public void setParentTxId(String parentTxId) { - this.parentTxId = parentTxId; - } - - public String getLocalTxId() { - return localTxId; - } - - public void setLocalTxId(String localTxId) { - this.localTxId = localTxId; + public AddTxEventDomain(TxStartedEvent event) { + this.event = event; + this.compensationMethod = event.getCompensationMethod(); + this.payloads = event.getPayloads(); + this.retries = event.getRetries(); } public TxState getState() { return state; } - public void setState(TxState state) { - this.state = state; - } - public String getCompensationMethod() { return compensationMethod; } @@ -102,4 +62,9 @@ public class AddTxEventDomain implements DomainEvent { public void setPayloads(byte[] payloads) { this.payloads = payloads; } + + @Override + public BaseEvent getEvent() { + return event; + } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java index 10c293c..e7b6a1c 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/DomainEvent.java @@ -18,7 +18,8 @@ package org.apache.servicecomb.pack.alpha.fsm.domain; import java.io.Serializable; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; public interface DomainEvent extends Serializable { - + BaseEvent getEvent(); } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java index d3e40b1..1f4b216 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java @@ -18,16 +18,26 @@ package org.apache.servicecomb.pack.alpha.fsm.domain; import org.apache.servicecomb.pack.alpha.fsm.SagaActorState; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; public class SagaEndedDomain implements DomainEvent { private SagaActorState state; + private BaseEvent event; - public SagaEndedDomain(SagaActorState state) { + public SagaEndedDomain(BaseEvent event, SagaActorState state) { + if(event != null){ + this.event = event; + } this.state = state; } public SagaActorState getState() { return state; } + + @Override + public BaseEvent getEvent() { + return event; + } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java index fe75f04..6799e0f 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaStartedDomain.java @@ -17,29 +17,32 @@ package org.apache.servicecomb.pack.alpha.fsm.domain; +import java.util.Calendar; +import java.util.Date; +import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; + public class SagaStartedDomain implements DomainEvent { - private long createTime; - private String globalTxId; - private long expirationTime; + private Date expirationTime; + private BaseEvent event; - public SagaStartedDomain(String globalTxId, long createTime, int timeout) { - this.createTime = createTime; - this.globalTxId = globalTxId; - if (timeout > 0) { - this.expirationTime = System.currentTimeMillis() + timeout * 1000; + public SagaStartedDomain(SagaStartedEvent event) { + this.event = event; + if (event.getTimeout() > 0) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(event.getCreateTime()); + calendar.add(Calendar.SECOND, event.getTimeout()); + this.expirationTime = calendar.getTime(); } } - public long getCreateTime() { - return createTime; - } - - public String getGlobalTxId() { - return globalTxId; + public Date getExpirationTime() { + return expirationTime; } - public long getExpirationTime() { - return expirationTime; + @Override + public BaseEvent getEvent() { + return event; } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java index 2c8831c..f12e784 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/UpdateTxEventDomain.java @@ -18,20 +18,41 @@ package org.apache.servicecomb.pack.alpha.fsm.domain; import org.apache.servicecomb.pack.alpha.fsm.TxState; +import org.apache.servicecomb.pack.alpha.fsm.event.TxAbortedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxCompensatedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.TxEndedEvent; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; public class UpdateTxEventDomain implements DomainEvent { private String parentTxId; private String localTxId; private TxState state; private byte[] throwablePayLoads; + private BaseEvent event; - public UpdateTxEventDomain(String parentTxId, String localTxId, TxState state, byte[] throwablePayLoads) { - this.parentTxId = parentTxId; - this.localTxId = localTxId; - this.state = state; - this.throwablePayLoads = throwablePayLoads; + public UpdateTxEventDomain(TxEndedEvent event) { + this.event = event; + this.parentTxId = event.getParentTxId(); + this.localTxId = event.getLocalTxId(); + this.state = TxState.COMMITTED; + } + + public UpdateTxEventDomain(TxAbortedEvent event) { + this.event = event; + this.parentTxId = event.getParentTxId(); + this.localTxId = event.getLocalTxId(); + this.throwablePayLoads = event.getPayloads(); + this.state = TxState.FAILED; } + public UpdateTxEventDomain(TxCompensatedEvent event) { + this.event = event; + this.parentTxId = event.getParentTxId(); + this.localTxId = event.getLocalTxId(); + this.state = TxState.COMPENSATED; + } + + public String getParentTxId() { return parentTxId; } @@ -63,4 +84,9 @@ public class UpdateTxEventDomain implements DomainEvent { public void setThrowablePayLoads(byte[] throwablePayLoads) { this.throwablePayLoads = throwablePayLoads; } + + @Override + public BaseEvent getEvent() { + return event; + } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java index b26504a..24d96f9 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaAbortedEvent.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event; +import java.util.Date; import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; public class SagaAbortedEvent extends SagaEvent { @@ -38,6 +39,21 @@ public class SagaAbortedEvent extends SagaEvent { return this; } + public Builder serviceName(String serviceName) { + sagaAbortedEvent.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + sagaAbortedEvent.setInstanceId(instanceId); + return this; + } + + public Builder createTime(Date createTime){ + sagaAbortedEvent.setCreateTime(createTime); + return this; + } + public SagaAbortedEvent build() { return sagaAbortedEvent; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java index 9ea5cf1..c2c5efe 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaEndedEvent.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event; +import java.util.Date; import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; public class SagaEndedEvent extends SagaEvent { @@ -35,6 +36,23 @@ public class SagaEndedEvent extends SagaEvent { public Builder globalTxId(String globalTxId) { sagaEndedEvent.setGlobalTxId(globalTxId); + sagaEndedEvent.setLocalTxId(globalTxId); + sagaEndedEvent.setParentTxId(globalTxId); + return this; + } + + public Builder serviceName(String serviceName) { + sagaEndedEvent.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + sagaEndedEvent.setInstanceId(instanceId); + return this; + } + + public Builder createTime(Date createTime){ + sagaEndedEvent.setCreateTime(createTime); return this; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java index 56af9cd..7637499 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaStartedEvent.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event; +import java.util.Date; import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; public class SagaStartedEvent extends SagaEvent { @@ -34,6 +35,13 @@ public class SagaStartedEvent extends SagaEvent { return new Builder(); } + @Override + public String toString() { + return "SagaStartedEvent{" + + "timeout=" + timeout + + "} " + super.toString(); + } + public static final class Builder { private SagaStartedEvent sagaStartedEvent; @@ -44,6 +52,8 @@ public class SagaStartedEvent extends SagaEvent { public Builder globalTxId(String globalTxId) { sagaStartedEvent.setGlobalTxId(globalTxId); + sagaStartedEvent.setLocalTxId(globalTxId); + sagaStartedEvent.setParentTxId(globalTxId); return this; } @@ -52,6 +62,21 @@ public class SagaStartedEvent extends SagaEvent { return this; } + public Builder serviceName(String serviceName) { + sagaStartedEvent.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + sagaStartedEvent.setInstanceId(instanceId); + return this; + } + + public Builder createTime(Date createTime){ + sagaStartedEvent.setCreateTime(createTime); + return this; + } + public SagaStartedEvent build() { return sagaStartedEvent; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java index bed3264..1cb0abd 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/SagaTimeoutEvent.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event; +import java.util.Date; import org.apache.servicecomb.pack.alpha.fsm.event.base.SagaEvent; public class SagaTimeoutEvent extends SagaEvent { @@ -32,11 +33,26 @@ public class SagaTimeoutEvent extends SagaEvent { sagaTimeoutEvent = new SagaTimeoutEvent(); } + public Builder serviceName(String serviceName) { + sagaTimeoutEvent.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + sagaTimeoutEvent.setInstanceId(instanceId); + return this; + } + public Builder globalTxId(String globalTxId) { sagaTimeoutEvent.setGlobalTxId(globalTxId); return this; } + public Builder createTime(Date createTime){ + sagaTimeoutEvent.setCreateTime(createTime); + return this; + } + public SagaTimeoutEvent build() { return sagaTimeoutEvent; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java index ca1f9e4..69cb0dd 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxAbortedEvent.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event; +import java.util.Date; import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; public class TxAbortedEvent extends TxEvent { @@ -43,6 +44,16 @@ public class TxAbortedEvent extends TxEvent { txAbortedEvent = new TxAbortedEvent(); } + public Builder serviceName(String serviceName) { + txAbortedEvent.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + txAbortedEvent.setInstanceId(instanceId); + return this; + } + public Builder parentTxId(String parentTxId) { txAbortedEvent.setParentTxId(parentTxId); return this; diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java index c007993..100a92b 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxCompensatedEvent.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event; +import java.util.Date; import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; public class TxCompensatedEvent extends TxEvent { @@ -33,6 +34,16 @@ public class TxCompensatedEvent extends TxEvent { txCompensatedEvent = new TxCompensatedEvent(); } + public Builder serviceName(String serviceName) { + txCompensatedEvent.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + txCompensatedEvent.setInstanceId(instanceId); + return this; + } + public Builder parentTxId(String parentTxId) { txCompensatedEvent.setParentTxId(parentTxId); return this; diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java index 5450664..7a1d42e 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxEndedEvent.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm.event; +import java.util.Date; import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; public class TxEndedEvent extends TxEvent { @@ -34,6 +35,16 @@ public class TxEndedEvent extends TxEvent { txEndedEvent = new TxEndedEvent(); } + public Builder serviceName(String serviceName) { + txEndedEvent.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + txEndedEvent.setInstanceId(instanceId); + return this; + } + public Builder parentTxId(String parentTxId) { txEndedEvent.setParentTxId(parentTxId); return this; diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java index 5f173ef..626e798 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/TxStartedEvent.java @@ -21,34 +21,11 @@ import java.util.Date; import org.apache.servicecomb.pack.alpha.fsm.event.base.TxEvent; public class TxStartedEvent extends TxEvent { - private String serviceName; - private String instanceId; private String compensationMethod; private byte[] payloads; - private Date creationTime; private String retryMethod; private int retries; - @Override - public String getServiceName() { - return serviceName; - } - - @Override - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - - @Override - public String getInstanceId() { - return instanceId; - } - - @Override - public void setInstanceId(String instanceId) { - this.instanceId = instanceId; - } - public String getCompensationMethod() { return compensationMethod; } @@ -65,14 +42,6 @@ public class TxStartedEvent extends TxEvent { this.payloads = payloads; } - public Date getCreationTime() { - return creationTime; - } - - public void setCreationTime(Date creationTime) { - this.creationTime = creationTime; - } - public String getRetryMethod() { return retryMethod; } @@ -136,11 +105,6 @@ public class TxStartedEvent extends TxEvent { return this; } - public Builder creationTime(Date creationTime) { - txStartedEvent.setCreationTime(creationTime); - return this; - } - public Builder retryMethod(String retryMethod) { txStartedEvent.setRetryMethod(retryMethod); return this; diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java index 4ba8372..f3ad9df 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/BaseEvent.java @@ -18,16 +18,38 @@ package org.apache.servicecomb.pack.alpha.fsm.event.base; import java.io.Serializable; +import java.util.Date; public abstract class BaseEvent implements Serializable { + + private String serviceName; + private String instanceId; private String globalTxId; - private long createTime = System.currentTimeMillis(); + private String parentTxId; + private String localTxId; + private Date createTime = new Date(); public BaseEvent() { } - public long getCreateTime() { + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + public Date getCreateTime() { return createTime; } @@ -39,10 +61,35 @@ public abstract class BaseEvent implements Serializable { this.globalTxId = globalTxId; } + public String getParentTxId() { + return parentTxId; + } + + public void setParentTxId(String parentTxId) { + this.parentTxId = parentTxId; + } + + public String getLocalTxId() { + return localTxId; + } + + public void setLocalTxId(String localTxId) { + this.localTxId = localTxId; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + @Override public String toString() { - return "BaseEvent{" + - "globalTxId='" + globalTxId + '\'' + + return this.getClass().getSimpleName()+"{" + + "serviceName='" + serviceName + '\'' + + ", instanceId='" + instanceId + '\'' + + ", globalTxId='" + globalTxId + '\'' + + ", parentTxId='" + parentTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + ", createTime=" + createTime + '}'; } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java index 686400d..b70bed3 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/SagaEvent.java @@ -19,13 +19,4 @@ package org.apache.servicecomb.pack.alpha.fsm.event.base; public class SagaEvent extends BaseEvent { - public SagaEvent() { - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "{" + - "globalTxId='" + this.getGlobalTxId() + '\'' + - '}'; - } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java index 61c3656..c543342 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/base/TxEvent.java @@ -18,49 +18,5 @@ package org.apache.servicecomb.pack.alpha.fsm.event.base; public abstract class TxEvent extends BaseEvent { - private String serviceName; - private String instanceId; - private String parentTxId; - private String localTxId; - public String getParentTxId() { - return parentTxId; - } - - public void setParentTxId(String parentTxId) { - this.parentTxId = parentTxId; - } - - public String getLocalTxId() { - return localTxId; - } - - public void setLocalTxId(String localTxId) { - this.localTxId = localTxId; - } - - public String getServiceName() { - return serviceName; - } - - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - - public String getInstanceId() { - return instanceId; - } - - public void setInstanceId(String instanceId) { - this.instanceId = instanceId; - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "{" + - "globalTxId='" + this.getGlobalTxId() + '\'' + - "parentTxId='" + parentTxId + '\'' + - ", localTxId='" + localTxId + '\'' + - '}'; - } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java index 01a7327..c14fac7 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventConsumer.java @@ -62,13 +62,6 @@ public class SagaEventConsumer { saga = system.actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId()); sagaCache.put(event.getGlobalTxId(), saga); } -// String actorPath = "/user/" + event.getGlobalTxId(); -// Optional<ActorRef> optional = this.getActorRefFromPath(actorPath); -// if (!optional.isPresent()) { -// saga = system.actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId()); -// } else { -// saga = optional.get(); -// } saga.tell(event, ActorRef.noSender()); if(LOG.isDebugEnabled()){ LOG.debug("tell {} to {}", event.toString(),saga); diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java index 4908029..59c1e4e 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java @@ -18,34 +18,58 @@ package org.apache.servicecomb.pack.alpha.fsm.model; import java.io.Serializable; +import java.util.Date; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import org.apache.servicecomb.pack.alpha.core.fsm.PackSagaEvent; import org.apache.servicecomb.pack.alpha.fsm.SagaActorState; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; public class SagaData implements Serializable { - private long beginTime = System.currentTimeMillis(); - private long endTime; + private String serviceName; + private String instanceId; + private Date beginTime = new Date(); + private Date endTime; private String globalTxId; - private long expirationTime; + private Date expirationTime; private boolean terminated; private SagaActorState lastState; private AtomicLong compensationRunningCounter = new AtomicLong(); private Map<String,TxEntity> txEntityMap = new HashMap<>(); + private List<BaseEvent> events = new LinkedList<>(); - public long getBeginTime() { + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + public Date getBeginTime() { return beginTime; } - public void setBeginTime(long beginTime) { + public void setBeginTime(Date beginTime) { this.beginTime = beginTime; } - public long getEndTime() { + public Date getEndTime() { return endTime; } - public void setEndTime(long endTime) { + public void setEndTime(Date endTime) { this.endTime = endTime; } @@ -57,11 +81,11 @@ public class SagaData implements Serializable { this.globalTxId = globalTxId; } - public long getExpirationTime() { + public Date getExpirationTime() { return expirationTime; } - public void setExpirationTime(long expirationTime) { + public void setExpirationTime(Date expirationTime) { this.expirationTime = expirationTime; } @@ -100,7 +124,31 @@ public class SagaData implements Serializable { } public long getTimeout(){ - return expirationTime-System.currentTimeMillis(); + return expirationTime.getTime()-beginTime.getTime(); + } + + public void logEvent(BaseEvent event){ + this.events.add(event); + } + + public List<BaseEvent> getEvents() { + return events; + } + + public List<PackSagaEvent> toPackSagaEventList(){ + List<PackSagaEvent> packSagaEventList = new LinkedList<>(); + events.forEach(event -> { + packSagaEventList.add(PackSagaEvent.builder() + .serviceName(serviceName) + .instanceId(instanceId) + .globalTxId(globalTxId) + .localTxId(event.getLocalTxId()) + .parentTxId(event.getParentTxId()) + .creationTime(event.getCreateTime()) + .type(event.getClass().getSimpleName()) + .build()); + }); + return packSagaEventList; } public static Builder builder() { @@ -115,12 +163,12 @@ public class SagaData implements Serializable { sagaData = new SagaData(); } - public Builder beginTime(long beginTime) { + public Builder beginTime(Date beginTime) { sagaData.setBeginTime(beginTime); return this; } - public Builder endTime(long endTime) { + public Builder endTime(Date endTime) { sagaData.setEndTime(endTime); return this; } @@ -130,7 +178,7 @@ public class SagaData implements Serializable { return this; } - public Builder expirationTime(long expirationTime) { + public Builder expirationTime(Date expirationTime) { sagaData.setExpirationTime(expirationTime); return this; } @@ -150,6 +198,16 @@ public class SagaData implements Serializable { return this; } + public Builder serviceName(String serviceName) { + sagaData.setServiceName(serviceName); + return this; + } + + public Builder instanceId(String instanceId) { + sagaData.setInstanceId(instanceId); + return this; + } + public SagaData build() { return sagaData; } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java index 774a2e2..f0d8f58 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java @@ -20,8 +20,7 @@ package org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka; import akka.actor.AbstractExtensionId; import akka.actor.ExtendedActorSystem; import akka.actor.Extension; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SagaDataExt; @@ -35,7 +34,7 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { } public static class SagaDataExt implements Extension { - private Map<String, SagaData> sagaDataMap = new ConcurrentHashMap(); + private ConcurrentSkipListMap<String, SagaData> sagaDataMap = new ConcurrentSkipListMap(); public void putSagaData(String globalTxId, SagaData sagaData){ sagaDataMap.put(globalTxId, sagaData); @@ -44,5 +43,13 @@ public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> { public SagaData getSagaData(String globalTxId){ return sagaDataMap.get(globalTxId); } + + public void clearSagaData(){ + sagaDataMap.clear(); + } + + public SagaData getLastSagaData(){ + return sagaDataMap.lastEntry().getValue(); + } } } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java index bcc2447..926ae84 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.pack.alpha.fsm; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import akka.actor.ActorRef; @@ -28,13 +29,16 @@ import akka.testkit.javadsl.TestKit; import com.typesafe.config.ConfigFactory; import java.time.Duration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent; import org.apache.servicecomb.pack.alpha.fsm.model.SagaData; import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import static org.hamcrest.CoreMatchers.*; public class SagaActorTest { @@ -98,7 +102,8 @@ public class SagaActorTest { watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -140,6 +145,7 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); }}; } @@ -166,8 +172,8 @@ public class SagaActorTest { ActorRef saga = system.actorOf(SagaActor.props(persistenceId)); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3) - .stream().forEach(event -> { + List<BaseEvent> eventListFirst = SagaEventSender.successfulFirstHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventListFirst.stream().forEach(event -> { saga.tell(event, getRef()); }); @@ -194,8 +200,8 @@ public class SagaActorTest { ActorRef recoveredSaga = system.actorOf(SagaActor.props(persistenceId), "recoveredSaga"); watch(recoveredSaga); recoveredSaga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3) - .stream().forEach(event -> { + List<BaseEvent> eventListSecond = SagaEventSender.successfulSecondHalfEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventListSecond.stream().forEach(event -> { recoveredSaga.tell(event, getRef()); }); @@ -227,6 +233,10 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), recoveredSaga); + + eventListFirst.addAll(eventListSecond); + assertThat(eventListFirst, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -246,8 +256,8 @@ public class SagaActorTest { ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - - SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1); + eventList.forEach( event -> { saga.tell(event, getRef()); }); @@ -276,6 +286,8 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -301,7 +313,8 @@ public class SagaActorTest { watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -337,6 +350,8 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -365,7 +380,8 @@ public class SagaActorTest { watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -408,6 +424,8 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -436,7 +454,8 @@ public class SagaActorTest { watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -481,6 +500,8 @@ public class SagaActorTest { assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED); assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -509,7 +530,8 @@ public class SagaActorTest { watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -540,6 +562,8 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -570,7 +594,8 @@ public class SagaActorTest { watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -616,6 +641,8 @@ public class SagaActorTest { assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED); assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -643,7 +670,8 @@ public class SagaActorTest { watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -685,6 +713,8 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -711,7 +741,8 @@ public class SagaActorTest { watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -745,6 +776,9 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -772,7 +806,8 @@ public class SagaActorTest { ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -802,6 +837,8 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -827,7 +864,8 @@ public class SagaActorTest { ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -863,6 +901,8 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } @@ -890,7 +930,8 @@ public class SagaActorTest { ActorRef saga = system.actorOf(SagaActor.props(genPersistenceId())); watch(saga); saga.tell(new PersistentFSM.SubscribeTransitionCallBack(getRef()), getRef()); - SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + List<BaseEvent> eventList = SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3); + eventList.stream().forEach( event -> { saga.tell(event, getRef()); }); @@ -924,6 +965,8 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + assertThat(eventList, is(sagaData.getEvents())); + system.stop(saga); }}; } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java index 925bbc2..ca67fb4 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaEventSender.java @@ -47,14 +47,14 @@ public class SagaEventSender { */ public static List<BaseEvent> successfulEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } @@ -66,10 +66,10 @@ public class SagaEventSender { */ public static List<BaseEvent> firstTxAbortedEvents(String globalTxId, String localTxId_1){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } @@ -84,13 +84,13 @@ public class SagaEventSender { */ public static List<BaseEvent> middleTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } @@ -108,16 +108,16 @@ public class SagaEventSender { */ public static List<BaseEvent> lastTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } @@ -135,16 +135,16 @@ public class SagaEventSender { */ public static List<BaseEvent> sagaAbortedEventBeforeTxComponsitedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); return sagaEvents; } @@ -162,16 +162,16 @@ public class SagaEventSender { */ public static List<BaseEvent> receivedRemainingEventAfterFirstTxAbortedEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } @@ -190,17 +190,17 @@ public class SagaEventSender { */ public static List<BaseEvent> sagaAbortedEventAfterAllTxEndedsEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); return sagaEvents; } @@ -216,14 +216,14 @@ public class SagaEventSender { */ public static List<BaseEvent> omegaSendSagaTimeoutEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaTimeoutEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaTimeoutEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } @@ -238,13 +238,13 @@ public class SagaEventSender { */ public static List<BaseEvent> sagaActorTriggerTimeoutEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3, int timeout){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).timeout(timeout).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).timeout(timeout).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); return sagaEvents; } @@ -260,14 +260,14 @@ public class SagaEventSender { */ public static List<BaseEvent> successfulWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } @@ -283,14 +283,14 @@ public class SagaEventSender { */ public static List<BaseEvent> successfulWithTxConcurrentCrossEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } @@ -308,16 +308,16 @@ public class SagaEventSender { */ public static List<BaseEvent> lastTxAbortedEventWithTxConcurrentEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxAbortedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxCompensatedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(SagaAbortedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxAbortedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxCompensatedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(SagaAbortedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } @@ -330,10 +330,10 @@ public class SagaEventSender { */ public static List<BaseEvent> successfulFirstHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(SagaStartedEvent.builder().globalTxId(globalTxId).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); return sagaEvents; } @@ -345,10 +345,10 @@ public class SagaEventSender { */ public static List<BaseEvent> successfulSecondHalfEvents(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){ List<BaseEvent> sagaEvents = new ArrayList<>(); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); - sagaEvents.add(TxStartedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(TxEndedEvent.builder().globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); - sagaEvents.add(SagaEndedEvent.builder().globalTxId(globalTxId).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build()); + sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build()); + sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build()); return sagaEvents; } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java index bc4a9a6..f408c09 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java @@ -19,6 +19,8 @@ package org.apache.servicecomb.pack.alpha.fsm; import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import akka.actor.ActorSystem; import com.google.common.eventbus.EventBus; @@ -35,7 +37,7 @@ import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest(classes = {SagaApplication.class}, properties = { - "alpha.model.actor.enabled=true", + "alpha.feature.akka.enabled=true", "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem", "akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal", "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local", @@ -59,21 +61,17 @@ public class SagaIntegrationTest { SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMMITTED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED); } @Test @@ -84,18 +82,15 @@ public class SagaIntegrationTest { sagaEventBus.post(event); }); - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMPENSATED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 1 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),1); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.FAILED); } @Test @@ -106,20 +101,16 @@ public class SagaIntegrationTest { SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMPENSATED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 2 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.FAILED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),2); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.FAILED); } @Test @@ -131,47 +122,39 @@ public class SagaIntegrationTest { SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMPENSATED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED); } @Test public void sagaAbortedEventBeforeTxComponsitedEventTest() { - final String globalTxId = UUID.randomUUID().toString(); - final String localTxId_1 = UUID.randomUUID().toString(); - final String localTxId_2 = UUID.randomUUID().toString(); - final String localTxId_3 = UUID.randomUUID().toString(); - SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { - sagaEventBus.post(event); - }); - - await().atMost(10, SECONDS).until(() -> { + final String globalTxId = UUID.randomUUID().toString(); + final String localTxId_1 = UUID.randomUUID().toString(); + final String localTxId_2 = UUID.randomUUID().toString(); + final String localTxId_3 = UUID.randomUUID().toString(); + SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { + sagaEventBus.post(event); + }); + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMPENSATED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED); } @Test @@ -183,21 +166,17 @@ public class SagaIntegrationTest { SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMPENSATED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.FAILED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMPENSATED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.FAILED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMPENSATED); } @Test @@ -209,21 +188,17 @@ public class SagaIntegrationTest { SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMPENSATED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMPENSATED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMPENSATED); } @Test @@ -235,21 +210,17 @@ public class SagaIntegrationTest { SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.SUSPENDED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED); } @Test @@ -262,21 +233,17 @@ public class SagaIntegrationTest { SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(timeout + 2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.SUSPENDED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED); } @Test @@ -288,21 +255,17 @@ public class SagaIntegrationTest { SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMMITTED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED); } @Test @@ -314,21 +277,17 @@ public class SagaIntegrationTest { SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMMITTED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMMITTED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.COMMITTED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.COMMITTED); } @Test @@ -340,21 +299,17 @@ public class SagaIntegrationTest { SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> { sagaEventBus.post(event); }); - - await().atMost(10, SECONDS).until(() -> { + await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - if(sagaData != null){ - return sagaData.getLastState() == SagaActorState.COMPENSATED - && sagaData.getBeginTime() > 0 - && sagaData.getEndTime() >0 - && sagaData.getTxEntityMap().size() == 3 - && sagaData.getTxEntityMap().get(localTxId_1).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_2).getState() == TxState.COMPENSATED - && sagaData.getTxEntityMap().get(localTxId_3).getState() == TxState.FAILED; - }else{ - return false; - } + return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; }); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertNotNull(sagaData.getBeginTime()); + assertNotNull(sagaData.getEndTime()); + assertEquals(sagaData.getTxEntityMap().size(),3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(),TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(),TxState.FAILED); } }
