This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit e683fea4e97df8406495e69badb24999e454c52a
Author: Eric Lee <[email protected]>
AuthorDate: Mon Jan 29 16:37:16 2018 +0800

    SCB-239 compensate immediately after event was aborted
    
    Signed-off-by: Eric Lee <[email protected]>
---
 .../servicecomb/saga/alpha/core/EventScanner.java  |  4 +++
 .../saga/alpha/core/TxConsistentService.java       |  2 +-
 .../servicecomb/saga/alpha/core/TxEvent.java       |  1 +
 .../saga/alpha/core/TxConsistentServiceTest.java   | 14 ++++++---
 .../alpha/server/AlphaEventControllerTest.java     |  5 ----
 .../saga/alpha/server/AlphaIntegrationTest.java    | 34 +++++++++++++---------
 6 files changed, 37 insertions(+), 23 deletions(-)

diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
index c10c09a..4f72a1c 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java
@@ -119,6 +119,10 @@ public class EventScanner implements Runnable {
 
       eventRepository.save(toTxAbortedEvent(event));
       timeoutRepository.markTxTimeoutAsDone(event.globalTxId(), 
event.localTxId());
+
+      if (event.type().equals(TxStartedEvent.name())) {
+        omegaCallback.compensate(event);
+      }
     });
   }
 
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
index 26309b6..541d54f 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxConsistentService.java
@@ -52,7 +52,7 @@ public class TxConsistentService {
     }
 
     if (isEventWithTimeout(event)) {
-      CompletableFuture.runAsync(() -> saveTxTimeout(event));
+      saveTxTimeout(event);
     }
 
     eventRepository.save(event);
diff --git 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
index 5966541..e34b7c6 100644
--- 
a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
+++ 
b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java
@@ -61,6 +61,7 @@ public class TxEvent {
         event.parentTxId,
         event.type,
         event.compensationMethod,
+        0,
         event.payloads);
   }
 
diff --git 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
index 43d3164..5467368 100644
--- 
a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
+++ 
b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.stream.Collectors;
 
 import org.apache.servicecomb.saga.common.EventType;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TxConsistentServiceTest {
@@ -87,8 +88,7 @@ public class TxConsistentServiceTest {
     @Override
     public void markTxTimeoutAsDone(String globalTxId, String localTxId) {
       for (TxTimeout timeout : timeouts) {
-        if (timeout.globalTxId().equals(globalTxId) &&
-            timeout.localTxId().equals(localTxId)) {
+        if (timeout.globalTxId().equals(globalTxId) && 
timeout.localTxId().equals(localTxId)) {
           timeout.setStatus(DONE.name());
           break;
         }
@@ -112,6 +112,12 @@ public class TxConsistentServiceTest {
   private final TxConsistentService consistentService = new 
TxConsistentService(eventRepository, timeoutRepository);
   private final byte[] payloads = "yeah".getBytes();
 
+  @Before
+  public void setUp() throws Exception {
+    events.clear();
+    timeouts.clear();
+  }
+
   @Test
   public void persistEventOnArrival() throws Exception {
     TxEvent[] events = {
@@ -146,8 +152,8 @@ public class TxConsistentServiceTest {
   @Test
   public void persistTimeoutEventOnArrival() {
     TxEvent[] events = {
-        newEventWithTimeout(SagaStartedEvent, globalTxId,3),
-        newEventWithTimeout(TxStartedEvent, 2),
+        newEventWithTimeout(SagaStartedEvent, globalTxId,2),
+        newEventWithTimeout(TxStartedEvent, 1),
         newEvent(TxEndedEvent),
         newEvent(TxCompensatedEvent),
         eventOf(SagaEndedEvent, globalTxId)};
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java
index a479e2b..c212b3b 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java
@@ -27,7 +27,6 @@ import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilder
 import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
 import static 
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
-import java.util.Date;
 import java.util.UUID;
 
 import org.apache.servicecomb.saga.alpha.core.TxEvent;
@@ -40,13 +39,10 @@ import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 @RunWith(SpringRunner.class)
 @WebMvcTest(AlphaEventController.class)
 public class AlphaEventControllerTest {
   private final TxEvent someEvent = someEvent();
-  private final ObjectMapper mapper = new ObjectMapper();
 
   @Autowired
   private MockMvc mockMvc;
@@ -72,7 +68,6 @@ public class AlphaEventControllerTest {
     return new TxEvent(
         uniquify("serviceName"),
         uniquify("instanceId"),
-        new Date(),
         uniquify("globalTxId"),
         uniquify("localTxId"),
         UUID.randomUUID().toString(),
diff --git 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 0073648..225c194 100644
--- 
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++ 
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -383,9 +383,9 @@ public class AlphaIntegrationTest {
     asyncStub.onConnected(serviceConfig, compensateResponseObserver);
     blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, 
globalTxId, null, 1));
 
-    assertThat(timeoutEntityRepository.count(), is(1L));
-    TxTimeout timeout = timeoutEntityRepository.findOne(1L);
-    assertThat(timeout.status(), is(NEW.name()));
+    await().atMost(1, SECONDS).until(() -> timeoutEntityRepository.count() == 
1L);
+    Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
+    timeouts.forEach(timeout -> assertThat(timeout.status(), is(NEW.name())));
 
     await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3);
 
@@ -395,10 +395,12 @@ public class AlphaIntegrationTest {
     assertThat(events.get(2).type(), is(SagaEndedEvent.name()));
 
     assertThat(timeoutEntityRepository.count(), is(1L));
-    timeout = timeoutEntityRepository.findOne(1L);
-    assertThat(timeout.status(), is(DONE.name()));
-    assertThat(timeout.globalTxId(), is(globalTxId));
-    assertThat(timeout.localTxId(), is(globalTxId));
+    timeouts = timeoutEntityRepository.findAll();
+    timeouts.forEach(timeout -> {
+      assertThat(timeout.status(), is(DONE.name()));
+      assertThat(timeout.globalTxId(), is(globalTxId));
+      assertThat(timeout.localTxId(), is(globalTxId));
+    });
   }
 
   @Test
@@ -407,19 +409,25 @@ public class AlphaIntegrationTest {
     blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, 
globalTxId));
     blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, 
globalTxId, 1));
 
-    await().atMost(2, SECONDS).until(() -> eventRepo.count() == 4);
+    await().atMost(2, SECONDS).until(() -> {
+      List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
+      return eventRepo.count() == 5 && events.get(events.size() - 
1).type().equals(SagaEndedEvent.name());
+    });
 
     List<TxEvent> events = eventRepo.findByGlobalTxId(globalTxId);
     assertThat(events.get(0).type(), is(SagaStartedEvent.name()));
     assertThat(events.get(1).type(), is(TxStartedEvent.name()));
     assertThat(events.get(2).type(), is(TxAbortedEvent.name()));
-    assertThat(events.get(3).type(), is(SagaEndedEvent.name()));
+    assertThat(events.get(3).type(), is(TxCompensatedEvent.name()));
+    assertThat(events.get(4).type(), is(SagaEndedEvent.name()));
 
     assertThat(timeoutEntityRepository.count(), is(1L));
-    TxTimeout timeout = timeoutEntityRepository.findOne(1L);
-    assertThat(timeout.status(), is(DONE.name()));
-    assertThat(timeout.globalTxId(), is(globalTxId));
-    assertThat(timeout.localTxId(), is(localTxId));
+    Iterable<TxTimeout> timeouts = timeoutEntityRepository.findAll();
+    timeouts.forEach(timeout -> {
+      assertThat(timeout.status(), is(DONE.name()));
+      assertThat(timeout.globalTxId(), is(globalTxId));
+      assertThat(timeout.localTxId(), is(localTxId));
+    });
   }
 
   private GrpcAck onCompensation(GrpcCompensateCommand command) {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to