This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit e683fea4e97df8406495e69badb24999e454c52a Author: Eric Lee <[email protected]> AuthorDate: Mon Jan 29 16:37:16 2018 +0800 SCB-239 compensate immediately after event was aborted Signed-off-by: Eric Lee <[email protected]> --- .../servicecomb/saga/alpha/core/EventScanner.java | 4 +++ .../saga/alpha/core/TxConsistentService.java | 2 +- .../servicecomb/saga/alpha/core/TxEvent.java | 1 + .../saga/alpha/core/TxConsistentServiceTest.java | 14 ++++++--- .../alpha/server/AlphaEventControllerTest.java | 5 ---- .../saga/alpha/server/AlphaIntegrationTest.java | 34 +++++++++++++--------- 6 files changed, 37 insertions(+), 23 deletions(-) diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index c10c09a..4f72a1c 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -119,6 +119,10 @@ public class EventScanner implements Runnable { eventRepository.save(toTxAbortedEvent(event)); timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), event.localTxId()); + + if (event.type().equals(TxStartedEvent.name())) { + omegaCallback.compensate(event); + } }); } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java index 26309b6..541d54f 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java @@ -52,7 +52,7 @@ public class TxConsistentService { } if (isEventWithTimeout(event)) { - CompletableFuture.runAsync(() -> saveTxTimeout(event)); + saveTxTimeout(event); } eventRepository.save(event); diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index 5966541..e34b7c6 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -61,6 +61,7 @@ public class TxEvent { event.parentTxId, event.type, event.compensationMethod, + 0, event.payloads); } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 43d3164..5467368 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.stream.Collectors; import org.apache.servicecomb.saga.common.EventType; +import org.junit.Before; import org.junit.Test; public class TxConsistentServiceTest { @@ -87,8 +88,7 @@ public class TxConsistentServiceTest { @Override public void markTxTimeoutAsDone(String globalTxId, String localTxId) { for (TxTimeout timeout : timeouts) { - if (timeout.globalTxId().equals(globalTxId) && - timeout.localTxId().equals(localTxId)) { + if (timeout.globalTxId().equals(globalTxId) && timeout.localTxId().equals(localTxId)) { timeout.setStatus(DONE.name()); break; } @@ -112,6 +112,12 @@ public class TxConsistentServiceTest { private final TxConsistentService consistentService = new TxConsistentService(eventRepository, timeoutRepository); private final byte[] payloads = "yeah".getBytes(); + @Before + public void setUp() throws Exception { + events.clear(); + timeouts.clear(); + } + @Test public void persistEventOnArrival() throws Exception { TxEvent[] events = { @@ -146,8 +152,8 @@ public class TxConsistentServiceTest { @Test public void persistTimeoutEventOnArrival() { TxEvent[] events = { - newEventWithTimeout(SagaStartedEvent, globalTxId,3), - newEventWithTimeout(TxStartedEvent, 2), + newEventWithTimeout(SagaStartedEvent, globalTxId,2), + newEventWithTimeout(TxStartedEvent, 1), newEvent(TxEndedEvent), newEvent(TxCompensatedEvent), eventOf(SagaEndedEvent, globalTxId)}; diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java index a479e2b..c212b3b 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java @@ -27,7 +27,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -import java.util.Date; import java.util.UUID; import org.apache.servicecomb.saga.alpha.core.TxEvent; @@ -40,13 +39,10 @@ import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.servlet.MockMvc; -import com.fasterxml.jackson.databind.ObjectMapper; - @RunWith(SpringRunner.class) @WebMvcTest(AlphaEventController.class) public class AlphaEventControllerTest { private final TxEvent someEvent = someEvent(); - private final ObjectMapper mapper = new ObjectMapper(); @Autowired private MockMvc mockMvc; @@ -72,7 +68,6 @@ public class AlphaEventControllerTest { return new TxEvent( uniquify("serviceName"), uniquify("instanceId"), - new Date(), uniquify("globalTxId"), uniquify("localTxId"), UUID.randomUUID().toString(), diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 0073648..225c194 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -383,9 +383,9 @@ public class AlphaIntegrationTest { asyncStub.onConnected(serviceConfig, compensateResponseObserver); blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1)); - assertThat(timeoutEntityRepository.count(), is(1L)); - TxTimeout timeout = timeoutEntityRepository.findOne(1L); - assertThat(timeout.status(), is(NEW.name())); + await().atMost(1, SECONDS).until(() -> timeoutEntityRepository.count() == 1L); + Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll(); + timeouts.forEach(timeout -> assertThat(timeout.status(), is(NEW.name()))); await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3); @@ -395,10 +395,12 @@ public class AlphaIntegrationTest { assertThat(events.get(2).type(), is(SagaEndedEvent.name())); assertThat(timeoutEntityRepository.count(), is(1L)); - timeout = timeoutEntityRepository.findOne(1L); - assertThat(timeout.status(), is(DONE.name())); - assertThat(timeout.globalTxId(), is(globalTxId)); - assertThat(timeout.localTxId(), is(globalTxId)); + timeouts = timeoutEntityRepository.findAll(); + timeouts.forEach(timeout -> { + assertThat(timeout.status(), is(DONE.name())); + assertThat(timeout.globalTxId(), is(globalTxId)); + assertThat(timeout.localTxId(), is(globalTxId)); + }); } @Test @@ -407,19 +409,25 @@ public class AlphaIntegrationTest { blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId)); blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1)); - await().atMost(2, SECONDS).until(() -> eventRepo.count() == 4); + await().atMost(2, SECONDS).until(() -> { + List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); + return eventRepo.count() == 5 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name()); + }); List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId); assertThat(events.get(0).type(), is(SagaStartedEvent.name())); assertThat(events.get(1).type(), is(TxStartedEvent.name())); assertThat(events.get(2).type(), is(TxAbortedEvent.name())); - assertThat(events.get(3).type(), is(SagaEndedEvent.name())); + assertThat(events.get(3).type(), is(TxCompensatedEvent.name())); + assertThat(events.get(4).type(), is(SagaEndedEvent.name())); assertThat(timeoutEntityRepository.count(), is(1L)); - TxTimeout timeout = timeoutEntityRepository.findOne(1L); - assertThat(timeout.status(), is(DONE.name())); - assertThat(timeout.globalTxId(), is(globalTxId)); - assertThat(timeout.localTxId(), is(localTxId)); + Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll(); + timeouts.forEach(timeout -> { + assertThat(timeout.status(), is(DONE.name())); + assertThat(timeout.globalTxId(), is(globalTxId)); + assertThat(timeout.localTxId(), is(localTxId)); + }); } private GrpcAck onCompensation(GrpcCompensateCommand command) { -- To stop receiving notification emails like this one, please contact [email protected].
