This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit ffaa350c987ec77c43209cf990d6f9e2bcab2560 Author: seanyinx <[email protected]> AuthorDate: Fri Jan 19 17:01:45 2018 +0800 SCB-218 made sure saga ended event is always the last event Signed-off-by: seanyinx <[email protected]> --- .../servicecomb/saga/alpha/core/EventScanner.java | 3 +-- .../servicecomb/saga/alpha/server/AlphaConfig.java | 2 +- .../saga/alpha/server/AlphaIntegrationTest.java | 29 +++++++++++++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index f9fa3be..80deeb3 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -100,8 +100,6 @@ public class EventScanner implements Runnable { } } - // TODO: 2018/1/13 SagaEndedEvent may still not be the last, because some omegas may have slow network and its TxEndedEvent reached late, - // unless we ask user to specify a name for each participant in the global TX in @Compensable private void updateCompensationStatus(TxEvent event) { commandRepository.markCommandAsDone(event.globalTxId(), event.localTxId()); log.info("Transaction with globalTxId {} and localTxId {} was compensated", @@ -135,6 +133,7 @@ public class EventScanner implements Runnable { EMPTY_PAYLOAD); } + // TODO: 2018/1/19 potentially compensation may be out of order if we don't wait till received compensated event for the previous one, since compensation is async private void compensate() { commandRepository.findFirstCommandToCompensate() .forEach(command -> { diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index 769ee5a..35352f4 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -69,7 +69,7 @@ class AlphaConfig { @Bean ScheduledExecutorService compensationScheduler() { - return Executors.newScheduledThreadPool(2); + return Executors.newScheduledThreadPool(1); } @Bean diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index a928443..5dddc1d 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.servicecomb.saga.alpha.server; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; @@ -30,15 +31,18 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; import java.util.function.Consumer; import javax.annotation.PostConstruct; import org.apache.servicecomb.saga.alpha.core.CommandRepository; +import org.apache.servicecomb.saga.alpha.core.EventScanner; import org.apache.servicecomb.saga.alpha.core.OmegaCallback; import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEvent; @@ -345,6 +349,23 @@ public class AlphaIntegrationTest { assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId)); } + @Test + public void sagaEndedEventIsAlwaysInTheEnd() throws Exception { + asyncStub.onConnected(serviceConfig, compensateResponseObserver); + blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent)); + blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent)); + + String anotherLocalTxId = UUID.randomUUID().toString(); + blockingStub.onTxEvent(someGrpcEvent(TxStartedEvent, globalTxId, anotherLocalTxId)); + blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId)); + + blockingStub.onTxEvent(someGrpcEvent(TxEndedEvent, globalTxId, anotherLocalTxId)); + + await().atMost(1, SECONDS).until(() -> eventRepo.count() == 8); + List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); + assertThat(events.get(events.size() - 1).type(), is(SagaEndedEvent.name())); + } + private GrpcAck onCompensation(GrpcCompensateCommand command) { return blockingStub.onTxEvent( eventOf(TxCompensatedEvent, @@ -445,6 +466,12 @@ public class AlphaIntegrationTest { @PostConstruct void init() { -// new EventScanner(Executors.newScheduledThreadPool(2), eventRepository, commandRepository, omegaCallback, 1, 1).run(); + // simulates concurrent db connections + new EventScanner( + Executors.newSingleThreadScheduledExecutor(), + eventRepository, + commandRepository, + omegaCallback, + 1).run(); } } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
