This is an automated email from the ASF dual-hosted git repository. zhanglei pushed a commit to branch SCB-1321 in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit a93055e6bc413c4cb66d9ff30be0d5e71634b759 Author: Lei Zhang <[email protected]> AuthorDate: Wed Jul 10 18:47:27 2019 +0800 SCB-1321 Remove Actor replying methods --- .../servicecomb/pack/alpha/fsm/SagaActor.java | 118 +++++++++-------- .../pack/alpha/fsm/domain/SagaEndedDomain.java | 5 + .../servicecomb/pack/alpha/fsm/SagaActorTest.java | 139 ++++++++++----------- .../pack/alpha/fsm/SagaIntegrationTest.java | 24 ++-- .../alpha/server/fsm/AlphaIntegrationFsmTest.java | 30 ++--- 5 files changed, 163 insertions(+), 153 deletions(-) diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java index 6881148..a6e3d05 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/SagaActor.java @@ -98,21 +98,21 @@ public class SagaActor extends SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) - .replying(data); + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(SagaAbortedEvent.class, (event, data) -> { SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) - .replying(data); + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(Arrays.asList(StateTimeout()), SagaData.class, (event, data) -> { SagaEndedDomain domainEvent = new SagaEndedDomain(null, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) - .replying(data); + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); }) ); @@ -120,7 +120,7 @@ public class SagaActor extends matchEvent(TxEndedEvent.class, SagaData.class, (event, data) -> { UpdateTxEventDomain domainEvent = new UpdateTxEventDomain(event); - if (data.getExpirationTime() != null) { + if (data.getExpirationTime() != null) { return goTo(SagaActorState.PARTIALLY_COMMITTED) .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); @@ -132,7 +132,7 @@ public class SagaActor extends ).event(TxStartedEvent.class, (event, data) -> { AddTxEventDomain domainEvent = new AddTxEventDomain(event); - if (data.getExpirationTime() != null) { + if (data.getExpirationTime() != null) { return stay() .applying(domainEvent) .forMax(Duration.create(data.getTimeout(), TimeUnit.MILLISECONDS)); @@ -145,7 +145,6 @@ public class SagaActor extends SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) - .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(TxAbortedEvent.class, @@ -156,7 +155,8 @@ public class SagaActor extends } ).event(Arrays.asList(StateTimeout()), SagaData.class, (event, data) -> { - return goTo(SagaActorState.SUSPENDED).replying(data); + return goTo(SagaActorState.SUSPENDED) + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); }) ); @@ -189,7 +189,6 @@ public class SagaActor extends SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) - .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(SagaEndedEvent.class, @@ -197,7 +196,6 @@ public class SagaActor extends SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMMITTED); return goTo(SagaActorState.COMMITTED) .applying(domainEvent) - .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(SagaAbortedEvent.class, @@ -212,7 +210,8 @@ public class SagaActor extends } ).event(Arrays.asList(StateTimeout()), SagaData.class, (event, data) -> { - return goTo(SagaActorState.SUSPENDED).replying(data); + return goTo(SagaActorState.SUSPENDED) + .forMax(Duration.create(1, TimeUnit.MILLISECONDS));//.replying(data); }) ); @@ -222,7 +221,6 @@ public class SagaActor extends SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.SUSPENDED); return goTo(SagaActorState.SUSPENDED) .applying(domainEvent) - .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } ).event(TxCompensatedEvent.class, SagaData.class, @@ -235,12 +233,12 @@ public class SagaActor extends ).event(TxComponsitedCheckInternalEvent.class, SagaData.class, (event, data) -> { if (hasCompensationSentTx(data) || !data.isTerminated()) { - return stay().replying(data); + return stay(); } else { - SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, + SagaActorState.COMPENSATED); return goTo(SagaActorState.COMPENSATED) .applying(domainEvent) - .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } } @@ -249,15 +247,17 @@ public class SagaActor extends data.setTerminated(true); if (hasCommittedTx(data)) { SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED); - return stay().replying(data).applying(domainEvent); + return stay() + .applying(domainEvent); } else if (hasCompensationSentTx(data)) { SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.FAILED); - return stay().replying(data).applying(domainEvent); + return stay() + .applying(domainEvent); } else { - SagaEndedDomain domainEvent = new SagaEndedDomain(event, SagaActorState.COMPENSATED); + SagaEndedDomain domainEvent = new SagaEndedDomain(event, + SagaActorState.COMPENSATED); return goTo(SagaActorState.COMPENSATED) .applying(domainEvent) - .replying(data) .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); } } @@ -277,21 +277,35 @@ public class SagaActor extends } ).event(Arrays.asList(StateTimeout()), SagaData.class, (event, data) -> { - return goTo(SagaActorState.SUSPENDED).replying(data); + SagaEndedDomain domainEvent = new SagaEndedDomain(SagaActorState.SUSPENDED); + return goTo(SagaActorState.SUSPENDED) + .applying(domainEvent) + .forMax(Duration.create(1, TimeUnit.MILLISECONDS)); }) ); when(SagaActorState.COMMITTED, matchAnyEvent( (event, data) -> { - /** - * deleteMessages 只会删除redis中actor的数据,但是不会删除actor的highestSequenceNr https://github.com/akka/akka/issues/21181 - * 已停止的 actor highestSequenceNr 需要手动清理,例如 actor 的持久化ID为 3c500008-7b9f-415f-b2fd-e6ad0d455fc1 - * 在Redis中当key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr没有匹配的 - * key=journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1时,表示这个actor已经停止,可以使用以下命令清理 - * del journal:persisted:3c500008-7b9f-415f-b2fd-e6ad0d455fc1:highestSequenceNr - * srem journal:persistenceIds 3c500008-7b9f-415f-b2fd-e6ad0d455fc1 - * */ + // 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理 + // 以下基于 journal-redis 说明: + // 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key + // journal:persistenceIds + // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 + // journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr + // + // 1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到 + // 2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件 + // 使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到 + // 3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号 + // + // 何如清理: + // 通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理 + // 遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr + // 如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除 + // 并删除 journal:persisted:item:highestSequenceNr + // + // 目前可以看到的解释是 https://github.com/akka/akka/issues/21181 deleteMessages(lastSequenceNr()); deleteSnapshot(snapshotSequenceNr()); return stop(); @@ -333,7 +347,7 @@ public class SagaActor extends SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()) .putSagaData(stateData().getGlobalTxId(), stateData()); } - if(LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()) { LOG.debug("transition {} {} -> {}", getSelf(), from, to); } }) @@ -342,14 +356,14 @@ public class SagaActor extends onTermination( matchStop( Normal(), (state, data) -> { - if(LOG.isDebugEnabled()){ - LOG.info("stop {} {}", data.getGlobalTxId(), state); + if (LOG.isDebugEnabled()) { + LOG.debug("stop {} {}", data.getGlobalTxId(), state); } data.setTerminated(true); data.setLastState(state); data.setEndTime(new Date()); SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()) - .putSagaData(data.getGlobalTxId(), data); + .stopSagaData(data.getGlobalTxId(), data); } ) ); @@ -359,7 +373,8 @@ public class SagaActor extends @Override public SagaData applyEvent(DomainEvent event, SagaData data) { // log event to SagaData - if(event.getEvent() != null && !(event.getEvent() instanceof TxComponsitedCheckInternalEvent)){ + if (event.getEvent() != null && !(event + .getEvent() instanceof TxComponsitedCheckInternalEvent)) { data.logEvent(event.getEvent()); } if (event instanceof SagaStartedDomain) { @@ -391,9 +406,6 @@ public class SagaActor extends TxEntity txEntity = data.getTxEntityMap().get(domainEvent.getLocalTxId()); txEntity.setEndTime(System.currentTimeMillis()); if (domainEvent.getState() == TxState.COMMITTED) { - // stop - //data.setEndTime(System.currentTimeMillis()); - //data.setTerminated(true); txEntity.setState(domainEvent.getState()); } else if (domainEvent.getState() == TxState.FAILED) { txEntity.setState(domainEvent.getState()); @@ -431,13 +443,17 @@ public class SagaActor extends data.setTerminated(true); } } - LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId()); + if (LOG.isDebugEnabled()) { + LOG.debug("applyEvent: {} {}", stateName(), stateData().getGlobalTxId()); + } return data; } @Override public void onRecoveryCompleted() { - LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId()); + if (LOG.isDebugEnabled()) { + LOG.debug("onRecoveryCompleted: {} {}", stateName(), stateData().getGlobalTxId()); + } } @Override @@ -468,37 +484,37 @@ public class SagaActor extends // increments the compensation running counter by one data.getCompensationRunningCounter().incrementAndGet(); txEntity.setState(TxState.COMPENSATION_SENT); - try{ + try { SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity); LOG.info("compensate {}", txEntity.getLocalTxId()); - }catch (AlphaException ex){ - LOG.error(ex.getMessage(),ex); + } catch (AlphaException ex) { + LOG.error(ex.getMessage(), ex); try { Thread.sleep(1000); } catch (InterruptedException e) { - LOG.error(e.getMessage(),e); + LOG.error(e.getMessage(), e); } - compensation(txEntity,data); - }catch (Exception ex){ - LOG.error("compensation failed "+txEntity.getLocalTxId(), ex); - if(txEntity.getRetries() > 0){ + compensation(txEntity, data); + } catch (Exception ex) { + LOG.error("compensation failed " + txEntity.getLocalTxId(), ex); + if (txEntity.getRetries() > 0) { // which means the retry number - if(txEntity.getRetriesCounter().incrementAndGet() < txEntity.getRetries()){ + if (txEntity.getRetriesCounter().incrementAndGet() < txEntity.getRetries()) { try { Thread.sleep(1000); } catch (InterruptedException e) { - LOG.error(e.getMessage(),e); + LOG.error(e.getMessage(), e); } - compensation(txEntity,data); + compensation(txEntity, data); } - } else if(txEntity.getRetries() == -1){ + } else if (txEntity.getRetries() == -1) { // which means retry it until succeed try { Thread.sleep(1000); } catch (InterruptedException e) { - LOG.error(e.getMessage(),e); + LOG.error(e.getMessage(), e); } - compensation(txEntity,data); + compensation(txEntity, data); } } } diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java index 1f4b216..f5bc708 100644 --- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java +++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/domain/SagaEndedDomain.java @@ -32,6 +32,11 @@ public class SagaEndedDomain implements DomainEvent { this.state = state; } + + public SagaEndedDomain(SagaActorState state) { + this.state = state; + } + public SagaActorState getState() { return state; } diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java index 926ae84..1b4d84b 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java @@ -132,19 +132,18 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 3); - sagaData.getTxEntityMap().forEach((k, v) -> { - assertEquals(v.getState(), TxState.COMMITTED); - }); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + sagaData.getTxEntityMap().forEach((k, v) -> { + assertEquals(v.getState(), TxState.COMMITTED); + }); assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); }}; @@ -220,13 +219,6 @@ public class SagaActorTest { assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 3); - sagaData.getTxEntityMap().forEach((k, v) -> { - assertEquals(v.getState(), TxState.COMMITTED); - }); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, recoveredSaga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED); @@ -234,6 +226,12 @@ public class SagaActorTest { Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), recoveredSaga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + sagaData.getTxEntityMap().forEach((k, v) -> { + assertEquals(v.getState(), TxState.COMMITTED); + }); eventListFirst.addAll(eventListSecond); assertThat(eventListFirst, is(sagaData.getEvents())); @@ -274,18 +272,17 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 1); - assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED); - assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 1); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); @@ -337,19 +334,19 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 2); - assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); - assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED); - assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 2); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); + assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); @@ -410,20 +407,19 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 3); - assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); - assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); - assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED); - assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); @@ -484,15 +480,13 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); - SagaData sagaData = expectMsgClass(SagaData.class); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); - sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);//expectMsgClass(SagaData.class); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getGlobalTxId(), globalTxId); assertEquals(sagaData.getTxEntityMap().size(), 3); assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); @@ -548,20 +542,19 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.FAILED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 3); - assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED); - assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); - assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED); - assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.FAILED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); @@ -700,19 +693,18 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 3); - assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED); - assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED); - assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.SUSPENDED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMMITTED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.COMMITTED); assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); @@ -824,19 +816,18 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 3); - sagaData.getTxEntityMap().forEach((k, v) -> { - assertEquals(v.getState(), TxState.COMMITTED); - }); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + sagaData.getTxEntityMap().forEach((k, v) -> { + assertEquals(v.getState(), TxState.COMMITTED); + }); assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); @@ -888,19 +879,18 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_ACTIVE, SagaActorState.PARTIALLY_COMMITTED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 3); - sagaData.getTxEntityMap().forEach((k, v) -> { - assertEquals(v.getState(), TxState.COMMITTED); - }); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.COMMITTED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + sagaData.getTxEntityMap().forEach((k, v) -> { + assertEquals(v.getState(), TxState.COMMITTED); + }); assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); @@ -951,20 +941,19 @@ public class SagaActorTest { transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.PARTIALLY_COMMITTED, SagaActorState.FAILED); - SagaData sagaData = expectMsgClass(SagaData.class); - assertEquals(sagaData.getGlobalTxId(), globalTxId); - assertEquals(sagaData.getTxEntityMap().size(), 3); - assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); - assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); - assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED); - assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); - transition = expectMsgClass(PersistentFSM.Transition.class); assertSagaTransition(transition, saga, SagaActorState.FAILED, SagaActorState.COMPENSATED); Terminated terminated = expectMsgClass(Terminated.class); assertEquals(terminated.getActor(), saga); + SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); + assertEquals(sagaData.getGlobalTxId(), globalTxId); + assertEquals(sagaData.getTxEntityMap().size(), 3); + assertEquals(sagaData.getTxEntityMap().get(localTxId_1).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_2).getState(), TxState.COMPENSATED); + assertEquals(sagaData.getTxEntityMap().get(localTxId_3).getState(), TxState.FAILED); + assertEquals(sagaData.getCompensationRunningCounter().intValue(), 0); assertThat(eventList, is(sagaData.getEvents())); system.stop(saga); diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java index f408c09..c1a61a6 100644 --- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java +++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java @@ -63,7 +63,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -84,7 +84,7 @@ public class SagaIntegrationTest { await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -103,7 +103,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -124,7 +124,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -146,7 +146,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -168,7 +168,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -190,7 +190,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -212,7 +212,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -235,7 +235,7 @@ public class SagaIntegrationTest { }); await().atMost(timeout + 2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -257,7 +257,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -279,7 +279,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); @@ -301,7 +301,7 @@ public class SagaIntegrationTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertNotNull(sagaData.getBeginTime()); diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java index ecac3b8..d44d728 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java @@ -99,7 +99,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getTxEntityMap().size(),3); @@ -121,7 +121,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); @@ -143,7 +143,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); @@ -167,7 +167,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); @@ -193,7 +193,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(5, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); @@ -225,7 +225,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(5, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); @@ -250,7 +250,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(20, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); @@ -275,7 +275,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getTxEntityMap().size(),3); @@ -300,7 +300,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(timeout + 1, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.SUSPENDED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.SUSPENDED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getTxEntityMap().size(),3); @@ -324,7 +324,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getTxEntityMap().size(),3); @@ -348,7 +348,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMMITTED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMMITTED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getTxEntityMap().size(),3); @@ -372,7 +372,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getTxEntityMap().size(),3); @@ -396,7 +396,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); @@ -436,7 +436,7 @@ public class AlphaIntegrationFsmTest { await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED); @@ -462,7 +462,7 @@ public class AlphaIntegrationFsmTest { }); await().atMost(2, SECONDS).until(() -> { SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); - return sagaData !=null && sagaData.getLastState()==SagaActorState.COMPENSATED; + return sagaData !=null && sagaData.isTerminated() && sagaData.getLastState()==SagaActorState.COMPENSATED; }); SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId); assertEquals(sagaData.getLastState(),SagaActorState.COMPENSATED);
